-/*
- * ===============================LICENSE_START======================================
- * dcae-analytics
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============================LICENSE_END===========================================
- */
-
-package org.openecomp.dcae.apod.analytics.tca.utils;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Table;
-import com.jayway.jsonpath.DocumentContext;
-import com.jayway.jsonpath.JsonPath;
-import com.jayway.jsonpath.TypeRef;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
-import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
-import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
-import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;
-import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction;
-import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType;
-import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader;
-import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality;
-import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
-import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;
-import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter;
-import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus;
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
-import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI;
-import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
-import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;
-import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
-import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
-import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
-import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
-import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
-import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
-import org.quartz.Job;
-import org.quartz.JobBuilder;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.Scheduler;
-import org.quartz.SchedulerException;
-import org.quartz.SimpleScheduleBuilder;
-import org.quartz.SimpleTrigger;
-import org.quartz.TriggerBuilder;
-import org.quartz.impl.StdSchedulerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import static com.google.common.collect.Lists.newArrayList;
-import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;
-
-/**
- * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get
- * pre configured Json Object Mapper understand serialization and deserialization of CEF Message
- * and TCA Policy
- *
- * @author Rajiv Singla . Creation Date: 10/24/2016.
- */
-public abstract class TCAUtils extends AnalyticsModelJsonUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);
-
- /**
- * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,
- * WARNING )
- */
- private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {
- @Override
- public int compare(Threshold threshold1, Threshold threshold2) {
- return threshold1.getSeverity().compareTo(threshold2.getSeverity());
- }
- };
-
- /**
- * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy}
- *
- * @return TCA Policy Metrics Per Event Name list
- */
- public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() {
- return new Function<TCAPolicy, List<MetricsPerEventName>>() {
- @Nullable
- @Override
- public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) {
- return tcaPolicy.getMetricsPerEventName();
- }
- };
- }
-
- /**
- * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from
- * {@link MetricsPerEventName}
- *
- * @return Event Names or a Metrics Per Event Name object
- */
- public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() {
- return new Function<MetricsPerEventName, String>() {
- @Override
- public String apply(@Nonnull MetricsPerEventName metricsPerEventName) {
- return metricsPerEventName.getEventName();
- }
- };
- }
-
-
- /**
- * Extracts {@link TCAPolicy} Event Names
- *
- * @param tcaPolicy TCA Policy
- * @return List of event names in the TCA Policy
- */
- public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) {
- final List<MetricsPerEventName> metricsPerEventNames =
- tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);
-
- return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction());
- }
-
- /**
- * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to
- * change during runtime
- *
- * @param tcaPolicy TCA Policy
- * @return a Supplier that memoize the TCA Policy event names
- */
- public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) {
- return Suppliers.memoize(new Supplier<List<String>>() {
- @Override
- public List<String> get() {
- return getPolicyEventNames(tcaPolicy);
- }
- });
- }
-
-
- /**
- * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path
- *
- * @param tcaPolicy TCA Policy
- * @return A table with Keys of event name and field path containing List of threshold as values
- */
- public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) {
- final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();
- for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) {
- final String eventName = metricsPerEventName.getEventName();
- final List<Threshold> thresholds = metricsPerEventName.getThresholds();
- for (Threshold threshold : thresholds) {
- final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath());
- if (existingThresholds == null) {
- final LinkedList<Threshold> newThresholdList = new LinkedList<>();
- newThresholdList.add(threshold);
- domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList);
- } else {
- domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold);
- }
- }
- }
- return domainFRTable;
- }
-
-
- /**
- * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table
- *
- * @param tcaPolicy TCA Policy
- * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values
- */
- public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier
- (final TCAPolicy tcaPolicy) {
- return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {
- @Override
- public Table<String, String, List<Threshold>> get() {
- return getPolicyEventNameThresholdsTable(tcaPolicy);
- }
- });
- }
-
-
- /**
- * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},
- * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to
- * filter out messages which does not match policy domain or event Name
- *
- * @param cefMessage CEF Message
- * @param tcaPolicy TCA Policy
- * @return Message Process Context after processing filter chain
- */
- public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,
- @Nonnull final TCAPolicy tcaPolicy) {
-
- final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();
- final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();
- final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter();
- // Create a list of message processors
- final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =
- ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter);
- final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);
- // Create a message processors chain
- final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =
- new GenericMessageChainProcessor<>(messageProcessors, processorContext);
- // process chain
- return tcaProcessingChain.processChain();
- }
-
-
- /**
- * Extracts json path values for given json Field Paths from using Json path notation. Assumes
- * that values extracted are always long
- *
- * @param message CEF Message
- * @param jsonFieldPaths Json Field Paths
- * @return Map containing key as json path and values as values associated with that json path
- */
- public static Map<String, List<Long>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>
- jsonFieldPaths) {
-
- final Map<String, List<Long>> jsonFieldPathMap = new HashMap<>();
- final DocumentContext documentContext = JsonPath.parse(message);
-
- for (String jsonFieldPath : jsonFieldPaths) {
- final List<Long> jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<Long>>() {
- });
- // If Json Field Values are not or empty
- if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
- // Filter out all null values in the filed values list
- final List<Long> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,
- Predicates.<Long>notNull()));
- // If there are non null values put them in the map
- if (!nonNullValues.isEmpty()) {
- jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
- }
- }
- }
-
- return jsonFieldPathMap;
- }
-
- /**
- * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
- * it applies threshold in order of their severity and record the first threshold per message field path
- *
- * @param messageFieldValues Field Path Values extracted from CEF Message
- * @param fieldThresholds Policy Thresholds for Field Path
- * @return Optional of violated threshold for a field path
- */
- public static Optional<Threshold> thresholdCalculator(final List<Long> messageFieldValues, final List<Threshold>
- fieldThresholds) {
- // order thresholds by severity
- Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);
- // Now apply each threshold to field values
- for (Threshold fieldThreshold : fieldThresholds) {
- for (Long messageFieldValue : messageFieldValues) {
- final Boolean isThresholdViolated =
- fieldThreshold.getDirection().operate(messageFieldValue, fieldThreshold.getThresholdValue());
- if (isThresholdViolated) {
- final Threshold violatedThreshold = Threshold.copy(fieldThreshold);
- violatedThreshold.setActualFieldValue(messageFieldValue);
- return Optional.of(violatedThreshold);
- }
- }
- }
- return Optional.absent();
- }
-
- /**
- * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.
- * Grabs first highest priority violated threshold
- *
- * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds
- * @return First Highest priority violated threshold
- */
- public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {
-
- final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());
-
- if (violatedThresholds.size() == 1) {
- return violatedThresholds.get(0);
- }
- Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);
- // Just grab the first violated threshold with highest priority
- return violatedThresholds.get(0);
- }
-
-
- /**
- * Creates {@link MetricsPerEventName} object which contains violated thresholds
- *
- * @param tcaPolicy TCA Policy
- * @param violatedThreshold Violated thresholds
- * @param eventName Event Name
- *
- * @return MetricsPerEventName object containing one highest severity violated threshold
- */
- public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,
- @Nonnull final Threshold violatedThreshold,
- @Nonnull final String eventName) {
-
- final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList(
- Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() {
- @Override
- public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) {
- return metricsPerEventName.getEventName().equals(eventName);
- }
- }));
- // TCA policy must have only one metrics per event Name
- if (metricsPerEventNames.size() == 1) {
- final MetricsPerEventName violatedMetrics =
- MetricsPerEventName.copy(metricsPerEventNames.get(0));
- violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));
- return violatedMetrics;
- } else {
- final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName);
- throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
- }
- }
-
- /**
- * Computes threshold violations
- *
- * @param processorContext Filtered processor Context
- * @return processor context with any threshold violations
- */
- public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {
- final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();
- return policyThresholdsProcessor.apply(processorContext);
- }
-
-
- /**
- * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}
- * formats
- *
- * @param processorContextWithViolations processor context which has TCA violations
- * @param tcaAppName tca app name
- * @param isAlertInCEFFormat determines if output alert is in CEF format
- *
- * @return TCA Alert String
- *
- * @throws JsonProcessingException If alert cannot be parsed into JSON String
- */
- public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,
- final String tcaAppName,
- final Boolean isAlertInCEFFormat) throws JsonProcessingException {
- if (isAlertInCEFFormat != null && isAlertInCEFFormat) {
- final EventListener eventListenerWithViolations =
- addThresholdViolationFields(processorContextWithViolations);
- final String alertString = writeValueAsString(eventListenerWithViolations);
- LOG.debug("Created alert in CEF Format: {}", alertString);
- return alertString;
- } else {
- final TCAVESResponse newTCAVESResponse =
- createNewTCAVESResponse(processorContextWithViolations, tcaAppName);
- final String alertString = writeValueAsString(newTCAVESResponse);
- LOG.debug("Created alert in Non CEF Format: {}", alertString);
- return alertString;
- }
- }
-
- /**
- * Adds threshold violation fields to {@link EventListener}
- *
- * @param processorContextWithViolations processor context that contains violations
- * @return event listener with threshold crossing alert fields populated
- */
- public static EventListener addThresholdViolationFields(
- final TCACEFProcessorContext processorContextWithViolations) {
-
- final MetricsPerEventName metricsPerEventName =
- processorContextWithViolations.getMetricsPerEventName();
- // confirm violations are indeed present
- if (metricsPerEventName == null) {
- final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";
- throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
- }
-
- // get violated threshold
- final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
- final EventListener eventListener = processorContextWithViolations.getCEFEventListener();
- final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
-
- // create new threshold crossing alert fields
- final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();
- thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());
- thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());
- thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));
- thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);
- thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);
- thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());
- thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());
- thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName());
-
- // create new performance count
- final PerformanceCounter performanceCounter = new PerformanceCounter();
- performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));
- performanceCounter.setName(violatedThreshold.getFieldPath());
- performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());
- performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());
-
- // set additional parameters for threshold crossing alert fields
- thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));
-
- // add threshold crossing fields to existing event listener
- eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);
-
- return eventListener;
- }
-
- /**
- * Converts {@link EventSeverity} to {@link Criticality}
- *
- * @param eventSeverity event severity
- *
- * @return performance counter criticality
- */
- private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {
- switch (eventSeverity) {
- case CRITICAL:
- return Criticality.CRIT;
- case MAJOR:
- return Criticality.MAJ;
- default:
- return Criticality.UNKNOWN;
- }
- }
-
- /**
- * Creates {@link TCAVESResponse} object
- *
- * @param processorContext processor Context with violations
- * @param tcaAppName TCA App Name
- *
- * @return TCA VES Response Message
- */
- public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,
- final String tcaAppName) {
-
- final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
- // confirm violations are indeed present
- if (metricsPerEventName == null) {
- final String errorMessage = "No violations metrics. Unable to create VES Response";
- throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
- }
-
- final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
- final EventListener eventListener = processorContext.getCEFEventListener();
- final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
-
- final TCAVESResponse tcavesResponse = new TCAVESResponse();
- // ClosedLoopControlName included in the DCAE configuration Policy
- tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());
- // version included in the DCAE configuration Policy
- tcavesResponse.setVersion(violatedThreshold.getVersion());
- // Generate a UUID for this output message
- tcavesResponse.setRequestID(UUID.randomUUID().toString());
- // commonEventHeader.startEpochMicrosec from the received VES measurementsForVfScaling message
- tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());
- // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot
- // TODO: Find out how to get this field
- tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);
-
- final AAI aai = new AAI();
- tcavesResponse.setAai(aai);
-
- // VM specific settings
- if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) {
- // Hard Coded - "VM"
- tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE);
- // Hard Coded - "vserver.vserver-name"
- tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET);
- aai.setGenericServerId(commonEventHeader.getReportingEntityName());
- } else {
- // VNF specific settings
- // Hard Coded - "VNF"
- tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE);
- // Hard Coded - "generic-vnf.vnf-id"
- tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET);
- // commonEventHeader.reportingEntityName from the received VES measurementsForVfScaling message (value for
- // the data element used in A&AI)
- aai.setGenericVNFId(commonEventHeader.getReportingEntityName());
- }
-
- // Hard Coded - "DCAE"
- tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);
- // policyScope included in the DCAE configuration Policy
- tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope());
- // policyName included in the DCAE configuration Policy
- tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName());
- // policyVersion included in the DCAE configuration Policy
- tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion());
- // Extracted from violated threshold
- tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name());
-
- return tcavesResponse;
- }
-
-
- /**
- * Extract Domain and Event Name from processor context if present
- *
- * @param processorContext processor context
- * @return Tuple of domain and event Name
- */
- public static Pair<String, String> getDomainAndEventName(
- @Nullable final TCACEFProcessorContext processorContext) {
-
- String domain = null;
- String eventName = null;
-
- if (processorContext != null &&
- processorContext.getCEFEventListener() != null &&
- processorContext.getCEFEventListener().getEvent() != null &&
- processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {
- final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()
- .getCommonEventHeader();
-
- if (commonEventHeader.getDomain() != null) {
- domain = commonEventHeader.getDomain().name();
- }
-
- if (commonEventHeader.getEventName() != null) {
- eventName = commonEventHeader.getEventName();
- }
-
- }
-
- return new ImmutablePair<>(domain, eventName);
-
- }
-
- /**
- * Creates {@link TCAPolicy} Metrics per Event Name list
- *
- * @param eventNamesMap Map containing event Name as key and corresponding values
- *
- * @return List of {@link MetricsPerEventName}
- */
- public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList(
- final Map<String, Map<String, String>> eventNamesMap) {
-
- // create a new metrics per event Name list
- final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>();
-
- for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) {
-
- // create new metrics per event Name instance
- final MetricsPerEventName newMetricsPerEventName =
- createNewMetricsPerEventName(eventNamesEntry);
- metricsPerEventNames.add(newMetricsPerEventName);
-
- // determine all threshold related values
- final Map<String, String> thresholdsValuesMaps =
- filterMapByKeyNamePrefix(eventNamesEntry.getValue(),
- AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);
-
- // create a map of all threshold values
- final Map<String, Map<String, String>> thresholdsMap =
- extractSubTree(thresholdsValuesMaps, 1, 2,
- AnalyticsConstants.TCA_POLICY_DELIMITER);
-
- // add thresholds to nmetrics per event Names threshold list
- for (Map<String, String> thresholdMap : thresholdsMap.values()) {
- newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap));
- }
-
- }
-
- return metricsPerEventNames;
- }
-
- /**
- * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap
- *
- * @param thresholdMap threshold map with threshold values
- *
- * @return new instance of TCA Policy Threshold
- */
- public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {
- final Threshold threshold = new Threshold();
- threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));
- threshold.setVersion(thresholdMap.get("policy.version"));
- threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));
- threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));
- threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));
- threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));
- threshold.setClosedLoopEventStatus(
- ControlLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus")));
- return threshold;
- }
-
- /**
- * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope
- * extracted from given eventNamesEntry
- *
- * @param eventNamesEntry Event Names Entry
- *
- * @return new instance of MetricsPerEventName
- */
- public static MetricsPerEventName createNewMetricsPerEventName(
- final Map.Entry<String, Map<String, String>> eventNamesEntry) {
- // determine event Name
- final String eventName = eventNamesEntry.getKey();
- // determine event Name thresholds
- final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue();
- final MetricsPerEventName metricsPerEventName = new MetricsPerEventName();
- final List<Threshold> thresholds = new LinkedList<>();
- metricsPerEventName.setThresholds(thresholds);
- metricsPerEventName.setEventName(eventName);
- // bind policyName, policyVersion, policyScope and closedLoopControlName
- metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName"));
- metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion"));
- metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope"));
- metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf(
- metricsPerEventNameThresholdsMap.get("closedLoopControlName")));
- return metricsPerEventName;
- }
-
- /**
- * Converts a flattened key/value map which has keys delimited by a given delimiter.
- * The start Index and end index extract the sub-key value and returns a new map containing
- * sub-keys and values.
- *
- * @param actualMap actual Map
- * @param startIndex start index
- * @param endIndex end index
- * @param delimiter delimiter
- *
- * @return Map with new sub tree map
- */
- public static Map<String, Map<String, String>> extractSubTree(
- final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {
-
- final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();
-
- // iterate over actual map entries
- for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {
- final String actualMapKey = actualMapEntry.getKey();
- final String actualMapValue = actualMapEntry.getValue();
-
- // determine delimiter start and end index
- final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);
- final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);
- final int keyLength = actualMapKey.length();
-
- // extract sub-tree map
- if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {
- final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);
- final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);
- final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);
- if (existingThresholdMap == null) {
- Map<String, String> newThresholdMap = new LinkedHashMap<>();
- newThresholdMap.put(subMapKey, actualMapValue);
- subTreeMap.put(thresholdKey, newThresholdMap);
- } else {
- existingThresholdMap.put(subMapKey, actualMapValue);
- }
-
- }
- }
-
- return subTreeMap;
-
- }
-
-
- /**
- * Provides a view of underlying map that filters out entries with keys starting with give prefix
- *
- * @param actualMap Target map that needs to be filtered
- * @param keyNamePrefix key prefix
- *
- * @return a view of actual map which only show entries which have give prefix
- */
- public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,
- final String keyNamePrefix) {
- return Maps.filterKeys(actualMap,
- new Predicate<String>() {
- @Override
- public boolean apply(@Nullable String key) {
- return key != null && key.startsWith(keyNamePrefix);
- }
- });
- }
-
-
- /**
- * Creates Quartz Scheduler
- *
- * @param pollingIntervalMS polling interval
- * @param stdSchedulerFactory Quartz standard schedule factory instance
- * @param quartzPublisherPropertiesFileName quartz properties file name
- * @param jobDataMap job Data map
- * @param quartzJobClass Quartz Job Class
- * @param quartzJobName Quartz Job Name
- * @param quartzTriggerName Quartz Trigger name
- *
- * @param <T> An implementation of Quartz {@link Job} interface
- * @return Configured Quartz Scheduler
- *
- * @throws SchedulerException exception if unable to create to Quartz Scheduler
- */
- public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,
- final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,
- final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,
- final String quartzTriggerName) throws SchedulerException {
-
- // Initialize a new Quartz Standard scheduler
- LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",
- quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);
- final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(
- quartzPublisherPropertiesFileName, new Properties());
- stdSchedulerFactory.initialize(quartzProperties);
- final Scheduler scheduler = stdSchedulerFactory.getScheduler();
-
- // Create a new job detail
- final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,
- AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();
-
- // Create a new scheduling builder
- final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
- .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule
- .repeatForever(); // repeats while worker is running
-
- // Create a trigger for the TCA Publisher Job
- final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()
- .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)
- .startNow() // job starts right away
- .withSchedule(simpleScheduleBuilder).build();
-
- scheduler.scheduleJob(jobDetail, simpleTrigger);
- LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());
- return scheduler;
- }
-
-}
+/*\r
+ * ===============================LICENSE_START======================================\r
+ * dcae-analytics\r
+ * ================================================================================\r
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============================LICENSE_END===========================================\r
+ */\r
+\r
+package org.openecomp.dcae.apod.analytics.tca.utils;\r
+\r
+import com.fasterxml.jackson.core.JsonProcessingException;\r
+import com.fasterxml.jackson.databind.JsonNode;\r
+import com.google.common.base.Function;\r
+import com.google.common.base.Optional;\r
+import com.google.common.base.Predicate;\r
+import com.google.common.base.Predicates;\r
+import com.google.common.base.Supplier;\r
+import com.google.common.base.Suppliers;\r
+import com.google.common.collect.HashBasedTable;\r
+import com.google.common.collect.ImmutableList;\r
+import com.google.common.collect.ImmutableMap;\r
+import com.google.common.collect.Iterables;\r
+import com.google.common.collect.Lists;\r
+import com.google.common.collect.Maps;\r
+import com.google.common.collect.Table;\r
+import com.jayway.jsonpath.DocumentContext;\r
+import com.jayway.jsonpath.JsonPath;\r
+import com.jayway.jsonpath.TypeRef;\r
+import org.apache.commons.lang3.StringUtils;\r
+import org.apache.commons.lang3.tuple.ImmutablePair;\r
+import org.apache.commons.lang3.tuple.Pair;\r
+import org.openecomp.dcae.apod.analytics.aai.service.AAIEnrichmentClient;\r
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
+import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;\r
+import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;\r
+import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;\r
+import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction;\r
+import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType;\r
+import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader;\r
+import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality;\r
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;\r
+import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;\r
+import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter;\r
+import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;\r
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;\r
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;\r
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;\r
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;\r
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;\r
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;\r
+import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI;\r
+import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;\r
+import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;\r
+import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;\r
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;\r
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;\r
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;\r
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;\r
+import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;\r
+import org.quartz.Job;\r
+import org.quartz.JobBuilder;\r
+import org.quartz.JobDataMap;\r
+import org.quartz.JobDetail;\r
+import org.quartz.Scheduler;\r
+import org.quartz.SchedulerException;\r
+import org.quartz.SimpleScheduleBuilder;\r
+import org.quartz.SimpleTrigger;\r
+import org.quartz.TriggerBuilder;\r
+import org.quartz.impl.StdSchedulerFactory;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import java.io.IOException;\r
+import java.util.ArrayList;\r
+import java.util.Collections;\r
+import java.util.Comparator;\r
+import java.util.Date;\r
+import java.util.HashMap;\r
+import java.util.Iterator;\r
+import java.util.LinkedHashMap;\r
+import java.util.LinkedList;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Properties;\r
+import java.util.Set;\r
+import java.util.SortedMap;\r
+import java.util.TreeMap;\r
+import java.util.UUID;\r
+\r
+import javax.annotation.Nonnull;\r
+import javax.annotation.Nullable;\r
+\r
+import static com.google.common.collect.Lists.newArrayList;\r
+import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;\r
+\r
+/**\r
+ * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get\r
+ * pre configured Json Object Mapper understand serialization and deserialization of CEF Message\r
+ * and TCA Policy\r
+ *\r
+ * @author Rajiv Singla . Creation Date: 10/24/2016.\r
+ */\r
+public abstract class TCAUtils extends AnalyticsModelJsonUtils {\r
+\r
+ private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);\r
+\r
+ /**\r
+ * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,\r
+ * WARNING )\r
+ */\r
+ private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {\r
+ @Override\r
+ public int compare(Threshold threshold1, Threshold threshold2) {\r
+ return threshold1.getSeverity().compareTo(threshold2.getSeverity());\r
+ }\r
+ };\r
+\r
+ /**\r
+ * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy}\r
+ *\r
+ * @return TCA Policy Metrics Per Event Name list\r
+ */\r
+ public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() {\r
+ return new Function<TCAPolicy, List<MetricsPerEventName>>() {\r
+ @Nullable\r
+ @Override\r
+ public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) {\r
+ return tcaPolicy.getMetricsPerEventName();\r
+ }\r
+ };\r
+ }\r
+\r
+ /**\r
+ * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from\r
+ * {@link MetricsPerEventName}\r
+ *\r
+ * @return Event Names or a Metrics Per Event Name object\r
+ */\r
+ public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() {\r
+ return new Function<MetricsPerEventName, String>() {\r
+ @Override\r
+ public String apply(@Nonnull MetricsPerEventName metricsPerEventName) {\r
+ return metricsPerEventName.getEventName();\r
+ }\r
+ };\r
+ }\r
+\r
+\r
+ /**\r
+ * Extracts {@link TCAPolicy} Event Names\r
+ *\r
+ * @param tcaPolicy TCA Policy\r
+ * @return List of event names in the TCA Policy\r
+ */\r
+ public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) {\r
+ final List<MetricsPerEventName> metricsPerEventNames =\r
+ tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);\r
+\r
+ return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction());\r
+ }\r
+\r
+ /**\r
+ * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to\r
+ * change during runtime\r
+ *\r
+ * @param tcaPolicy TCA Policy\r
+ * @return a Supplier that memoize the TCA Policy event names\r
+ */\r
+ public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) {\r
+ return Suppliers.memoize(new Supplier<List<String>>() {\r
+ @Override\r
+ public List<String> get() {\r
+ return getPolicyEventNames(tcaPolicy);\r
+ }\r
+ });\r
+ }\r
+\r
+\r
+ /**\r
+ * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path\r
+ *\r
+ * @param tcaPolicy TCA Policy\r
+ * @return A table with Keys of event name and field path containing List of threshold as values\r
+ */\r
+ public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) {\r
+ final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();\r
+ for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) {\r
+ final String eventName = metricsPerEventName.getEventName();\r
+ final List<Threshold> thresholds = metricsPerEventName.getThresholds();\r
+ for (Threshold threshold : thresholds) {\r
+ final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath());\r
+ if (existingThresholds == null) {\r
+ final LinkedList<Threshold> newThresholdList = new LinkedList<>();\r
+ newThresholdList.add(threshold);\r
+ domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList);\r
+ } else {\r
+ domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold);\r
+ }\r
+ }\r
+ }\r
+ return domainFRTable;\r
+ }\r
+\r
+\r
+ /**\r
+ * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table\r
+ *\r
+ * @param tcaPolicy TCA Policy\r
+ * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values\r
+ */\r
+ public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier\r
+ (final TCAPolicy tcaPolicy) {\r
+ return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {\r
+ @Override\r
+ public Table<String, String, List<Threshold>> get() {\r
+ return getPolicyEventNameThresholdsTable(tcaPolicy);\r
+ }\r
+ });\r
+ }\r
+\r
+\r
+ /**\r
+ * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},\r
+ * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to\r
+ * filter out messages which does not match policy domain or event Name\r
+ *\r
+ * @param cefMessage CEF Message\r
+ * @param tcaPolicy TCA Policy\r
+ * @return Message Process Context after processing filter chain\r
+ */\r
+ public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,\r
+ @Nonnull final TCAPolicy tcaPolicy) {\r
+\r
+ final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();\r
+ final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();\r
+ final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter();\r
+ // Create a list of message processors\r
+ final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =\r
+ ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter);\r
+ final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);\r
+ // Create a message processors chain\r
+ final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =\r
+ new GenericMessageChainProcessor<>(messageProcessors, processorContext);\r
+ // process chain\r
+ return tcaProcessingChain.processChain();\r
+ }\r
+\r
+\r
+ /**\r
+ * Extracts json path values for given json Field Paths from using Json path notation. Assumes\r
+ * that values extracted are always long\r
+ *\r
+ * @param message CEF Message\r
+ * @param jsonFieldPaths Json Field Paths\r
+ * @return Map containing key as json path and values as values associated with that json path\r
+ */\r
+ public static Map<String, List<Long>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>\r
+ jsonFieldPaths) {\r
+\r
+ final Map<String, List<Long>> jsonFieldPathMap = new HashMap<>();\r
+ final DocumentContext documentContext = JsonPath.parse(message);\r
+\r
+ for (String jsonFieldPath : jsonFieldPaths) {\r
+ final List<Long> jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<Long>>() {\r
+ });\r
+ // If Json Field Values are not or empty\r
+ if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {\r
+ // Filter out all null values in the filed values list\r
+ final List<Long> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,\r
+ Predicates.<Long>notNull()));\r
+ // If there are non null values put them in the map\r
+ if (!nonNullValues.isEmpty()) {\r
+ jsonFieldPathMap.put(jsonFieldPath, nonNullValues);\r
+ }\r
+ }\r
+ }\r
+\r
+ return jsonFieldPathMap;\r
+ }\r
+\r
+ /**\r
+ * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path\r
+ * it applies threshold in order of their severity and record the first threshold per message field path\r
+ *\r
+ * @param messageFieldValues Field Path Values extracted from CEF Message\r
+ * @param fieldThresholds Policy Thresholds for Field Path\r
+ * @return Optional of violated threshold for a field path\r
+ */\r
+ public static Optional<Threshold> thresholdCalculator(final List<Long> messageFieldValues, final List<Threshold>\r
+ fieldThresholds) {\r
+ // order thresholds by severity\r
+ Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);\r
+ // Now apply each threshold to field values\r
+ for (Threshold fieldThreshold : fieldThresholds) {\r
+ for (Long messageFieldValue : messageFieldValues) {\r
+ final Boolean isThresholdViolated =\r
+ fieldThreshold.getDirection().operate(messageFieldValue, fieldThreshold.getThresholdValue());\r
+ if (isThresholdViolated) {\r
+ final Threshold violatedThreshold = Threshold.copy(fieldThreshold);\r
+ violatedThreshold.setActualFieldValue(messageFieldValue);\r
+ return Optional.of(violatedThreshold);\r
+ }\r
+ }\r
+ }\r
+ return Optional.absent();\r
+ }\r
+\r
+ /**\r
+ * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.\r
+ * Grabs first highest priority violated threshold\r
+ *\r
+ * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds\r
+ * @return First Highest priority violated threshold\r
+ */\r
+ public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {\r
+\r
+ final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());\r
+\r
+ if (violatedThresholds.size() == 1) {\r
+ return violatedThresholds.get(0);\r
+ }\r
+ Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);\r
+ // Just grab the first violated threshold with highest priority\r
+ return violatedThresholds.get(0);\r
+ }\r
+\r
+\r
+ /**\r
+ * Creates {@link MetricsPerEventName} object which contains violated thresholds\r
+ *\r
+ * @param tcaPolicy TCA Policy\r
+ * @param violatedThreshold Violated thresholds\r
+ * @param eventName Event Name\r
+ *\r
+ * @return MetricsPerEventName object containing one highest severity violated threshold\r
+ */\r
+ public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,\r
+ @Nonnull final Threshold violatedThreshold,\r
+ @Nonnull final String eventName) {\r
+\r
+ final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList(\r
+ Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() {\r
+ @Override\r
+ public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) {\r
+ return metricsPerEventName.getEventName().equals(eventName);\r
+ }\r
+ }));\r
+ // TCA policy must have only one metrics per event Name\r
+ if (metricsPerEventNames.size() == 1) {\r
+ final MetricsPerEventName violatedMetrics =\r
+ MetricsPerEventName.copy(metricsPerEventNames.get(0));\r
+ violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));\r
+ return violatedMetrics;\r
+ } else {\r
+ final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName);\r
+ throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Computes threshold violations\r
+ *\r
+ * @param processorContext Filtered processor Context\r
+ * @return processor context with any threshold violations\r
+ */\r
+ public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {\r
+ final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();\r
+ return policyThresholdsProcessor.apply(processorContext);\r
+ }\r
+\r
+\r
+ /**\r
+ * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}\r
+ * formats\r
+ *\r
+ * @param processorContextWithViolations processor context which has TCA violations\r
+ * @param tcaAppName tca app name\r
+ * @param isAlertInCEFFormat determines if output alert is in CEF format\r
+ *\r
+ * @return TCA Alert String\r
+ *\r
+ * @throws JsonProcessingException If alert cannot be parsed into JSON String\r
+ */\r
+ public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,\r
+ final String tcaAppName,\r
+ final Boolean isAlertInCEFFormat) throws JsonProcessingException {\r
+ if (isAlertInCEFFormat != null && isAlertInCEFFormat) {\r
+ final EventListener eventListenerWithViolations =\r
+ addThresholdViolationFields(processorContextWithViolations);\r
+ final String alertString = writeValueAsString(eventListenerWithViolations);\r
+ LOG.debug("Created alert in CEF Format: {}", alertString);\r
+ return alertString;\r
+ } else {\r
+ final TCAVESResponse newTCAVESResponse =\r
+ createNewTCAVESResponse(processorContextWithViolations, tcaAppName);\r
+ final String alertString = writeValueAsString(newTCAVESResponse);\r
+ LOG.debug("Created alert in Non CEF Format: {}", alertString);\r
+ return alertString;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Adds threshold violation fields to {@link EventListener}\r
+ *\r
+ * @param processorContextWithViolations processor context that contains violations\r
+ * @return event listener with threshold crossing alert fields populated\r
+ */\r
+ public static EventListener addThresholdViolationFields(\r
+ final TCACEFProcessorContext processorContextWithViolations) {\r
+\r
+ final MetricsPerEventName metricsPerEventName =\r
+ processorContextWithViolations.getMetricsPerEventName();\r
+ // confirm violations are indeed present\r
+ if (metricsPerEventName == null) {\r
+ final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";\r
+ throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
+ }\r
+\r
+ // get violated threshold\r
+ final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);\r
+ final EventListener eventListener = processorContextWithViolations.getCEFEventListener();\r
+ final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();\r
+\r
+ // create new threshold crossing alert fields\r
+ final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();\r
+ thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());\r
+ thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());\r
+ thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));\r
+ thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);\r
+ thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);\r
+ thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());\r
+ thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());\r
+ thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName());\r
+\r
+ // create new performance count\r
+ final PerformanceCounter performanceCounter = new PerformanceCounter();\r
+ performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));\r
+ performanceCounter.setName(violatedThreshold.getFieldPath());\r
+ performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());\r
+ performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());\r
+\r
+ // set additional parameters for threshold crossing alert fields\r
+ thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));\r
+\r
+ // add threshold crossing fields to existing event listener\r
+ eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);\r
+\r
+ return eventListener;\r
+ }\r
+\r
+ /**\r
+ * Converts {@link EventSeverity} to {@link Criticality}\r
+ *\r
+ * @param eventSeverity event severity\r
+ *\r
+ * @return performance counter criticality\r
+ */\r
+ private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {\r
+ switch (eventSeverity) {\r
+ case CRITICAL:\r
+ return Criticality.CRIT;\r
+ case MAJOR:\r
+ return Criticality.MAJ;\r
+ default:\r
+ return Criticality.UNKNOWN;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Creates {@link TCAVESResponse} object\r
+ *\r
+ * @param processorContext processor Context with violations\r
+ * @param tcaAppName TCA App Name\r
+ *\r
+ * @return TCA VES Response Message\r
+ */\r
+ public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,\r
+ final String tcaAppName) {\r
+\r
+ final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();\r
+ // confirm violations are indeed present\r
+ if (metricsPerEventName == null) {\r
+ final String errorMessage = "No violations metrics. Unable to create VES Response";\r
+ throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
+ }\r
+\r
+ final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);\r
+ final EventListener eventListener = processorContext.getCEFEventListener();\r
+ final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();\r
+\r
+ final TCAVESResponse tcavesResponse = new TCAVESResponse();\r
+ // ClosedLoopControlName included in the DCAE configuration Policy\r
+ tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());\r
+ // version included in the DCAE configuration Policy\r
+ tcavesResponse.setVersion(violatedThreshold.getVersion());\r
+ // Generate a UUID for this output message\r
+ tcavesResponse.setRequestID(UUID.randomUUID().toString());\r
+ // commonEventHeader.startEpochMicrosec from the received VES message\r
+ tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());\r
+ // commonEventHeader.lastEpochMicrosec from the received VES message for abated alerts\r
+ if (violatedThreshold.getClosedLoopEventStatus() == ClosedLoopEventStatus.ABATED) {\r
+ tcavesResponse.setClosedLoopAlarmEnd(commonEventHeader.getLastEpochMicrosec());\r
+ }\r
+ // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot\r
+ tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);\r
+\r
+ final AAI aai = new AAI();\r
+ tcavesResponse.setAai(aai);\r
+\r
+ // VM specific settings\r
+ if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) {\r
+ // Hard Coded - "VM"\r
+ tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE);\r
+ // Hard Coded - "vserver.vserver-name"\r
+ tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET);\r
+ // commonEventHeader.sourceName from the received VES message\r
+ aai.setGenericServerId(commonEventHeader.getSourceName());\r
+ } else {\r
+ // VNF specific settings\r
+ // Hard Coded - "VNF"\r
+ tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE);\r
+ // Hard Coded - "generic-vnf.vnf-id"\r
+ tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET);\r
+ // commonEventHeader.sourceName from the received VES message\r
+ aai.setGenericVNFId(commonEventHeader.getSourceName());\r
+ }\r
+\r
+ // Hard Coded - "DCAE"\r
+ tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);\r
+ // policyScope included in the DCAE configuration Policy\r
+ tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope());\r
+ // policyName included in the DCAE configuration Policy\r
+ tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName());\r
+ // policyVersion included in the DCAE configuration Policy\r
+ tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion());\r
+ // Extracted from violated threshold\r
+ tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name());\r
+\r
+ return tcavesResponse;\r
+ }\r
+\r
+\r
+ /**\r
+ * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert\r
+ *\r
+ * @param tcavesResponse alert\r
+ *\r
+ * @return control Loop Schema Type\r
+ */\r
+ public static ControlLoopSchemaType determineControlLoopSchemaType(final TCAVESResponse tcavesResponse) {\r
+ final AAI aai = tcavesResponse.getAai();\r
+ if (aai.getGenericServerId() != null) {\r
+ return ControlLoopSchemaType.VM;\r
+ } else {\r
+ return ControlLoopSchemaType.VNF;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert\r
+ *\r
+ * @param tcavesResponse {@link TCAVESResponse} TCA alert\r
+ *\r
+ * @return Source name\r
+ */\r
+ public static String determineSourceName(final TCAVESResponse tcavesResponse) {\r
+ final AAI aai = tcavesResponse.getAai();\r
+ if (aai.getGenericServerId() != null) {\r
+ return aai.getGenericServerId();\r
+ } else {\r
+ return aai.getGenericVNFId();\r
+ }\r
+ }\r
+\r
+\r
+ /**\r
+ * Extract Domain and Event Name from processor context if present\r
+ *\r
+ * @param processorContext processor context\r
+ * @return Tuple of domain and event Name\r
+ */\r
+ public static Pair<String, String> getDomainAndEventName(\r
+ @Nullable final TCACEFProcessorContext processorContext) {\r
+\r
+ String domain = null;\r
+ String eventName = null;\r
+\r
+ if (processorContext != null &&\r
+ processorContext.getCEFEventListener() != null &&\r
+ processorContext.getCEFEventListener().getEvent() != null &&\r
+ processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {\r
+ final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()\r
+ .getCommonEventHeader();\r
+\r
+ if (commonEventHeader.getDomain() != null) {\r
+ domain = commonEventHeader.getDomain().name();\r
+ }\r
+\r
+ if (commonEventHeader.getEventName() != null) {\r
+ eventName = commonEventHeader.getEventName();\r
+ }\r
+\r
+ }\r
+\r
+ return new ImmutablePair<>(domain, eventName);\r
+\r
+ }\r
+\r
+ /**\r
+ * Creates {@link TCAPolicy} Metrics per Event Name list\r
+ *\r
+ * @param eventNamesMap Map containing event Name as key and corresponding values\r
+ *\r
+ * @return List of {@link MetricsPerEventName}\r
+ */\r
+ public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList(\r
+ final Map<String, Map<String, String>> eventNamesMap) {\r
+\r
+ // create a new metrics per event Name list\r
+ final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>();\r
+\r
+ for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) {\r
+\r
+ // create new metrics per event Name instance\r
+ final MetricsPerEventName newMetricsPerEventName =\r
+ createNewMetricsPerEventName(eventNamesEntry);\r
+ metricsPerEventNames.add(newMetricsPerEventName);\r
+\r
+ // determine all threshold related values\r
+ final Map<String, String> thresholdsValuesMaps =\r
+ filterMapByKeyNamePrefix(eventNamesEntry.getValue(),\r
+ AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);\r
+\r
+ // create a map of all threshold values\r
+ final Map<String, Map<String, String>> thresholdsMap =\r
+ extractSubTree(thresholdsValuesMaps, 1, 2,\r
+ AnalyticsConstants.TCA_POLICY_DELIMITER);\r
+\r
+ // add thresholds to nmetrics per event Names threshold list\r
+ for (Map<String, String> thresholdMap : thresholdsMap.values()) {\r
+ newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap));\r
+ }\r
+\r
+ }\r
+\r
+ return metricsPerEventNames;\r
+ }\r
+\r
+ /**\r
+ * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap\r
+ *\r
+ * @param thresholdMap threshold map with threshold values\r
+ *\r
+ * @return new instance of TCA Policy Threshold\r
+ */\r
+ public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {\r
+ final Threshold threshold = new Threshold();\r
+ threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));\r
+ threshold.setVersion(thresholdMap.get("policy.version"));\r
+ threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));\r
+ threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));\r
+ threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));\r
+ threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));\r
+ threshold.setClosedLoopEventStatus(\r
+ ClosedLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus")));\r
+ return threshold;\r
+ }\r
+\r
+ /**\r
+ * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope\r
+ * extracted from given eventNamesEntry\r
+ *\r
+ * @param eventNamesEntry Event Names Entry\r
+ *\r
+ * @return new instance of MetricsPerEventName\r
+ */\r
+ public static MetricsPerEventName createNewMetricsPerEventName(\r
+ final Map.Entry<String, Map<String, String>> eventNamesEntry) {\r
+ // determine event Name\r
+ final String eventName = eventNamesEntry.getKey();\r
+ // determine event Name thresholds\r
+ final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue();\r
+ final MetricsPerEventName metricsPerEventName = new MetricsPerEventName();\r
+ final List<Threshold> thresholds = new LinkedList<>();\r
+ metricsPerEventName.setThresholds(thresholds);\r
+ metricsPerEventName.setEventName(eventName);\r
+ // bind policyName, policyVersion, policyScope and closedLoopControlName\r
+ metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName"));\r
+ metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion"));\r
+ metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope"));\r
+ metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf(\r
+ metricsPerEventNameThresholdsMap.get("controlLoopSchemaType")));\r
+ return metricsPerEventName;\r
+ }\r
+\r
+ /**\r
+ * Converts a flattened key/value map which has keys delimited by a given delimiter.\r
+ * The start Index and end index extract the sub-key value and returns a new map containing\r
+ * sub-keys and values.\r
+ *\r
+ * @param actualMap actual Map\r
+ * @param startIndex start index\r
+ * @param endIndex end index\r
+ * @param delimiter delimiter\r
+ *\r
+ * @return Map with new sub tree map\r
+ */\r
+ public static Map<String, Map<String, String>> extractSubTree(\r
+ final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {\r
+\r
+ final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();\r
+\r
+ // iterate over actual map entries\r
+ for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {\r
+ final String actualMapKey = actualMapEntry.getKey();\r
+ final String actualMapValue = actualMapEntry.getValue();\r
+\r
+ // determine delimiter start and end index\r
+ final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);\r
+ final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);\r
+ final int keyLength = actualMapKey.length();\r
+\r
+ // extract sub-tree map\r
+ if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {\r
+ final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);\r
+ final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);\r
+ final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);\r
+ if (existingThresholdMap == null) {\r
+ Map<String, String> newThresholdMap = new LinkedHashMap<>();\r
+ newThresholdMap.put(subMapKey, actualMapValue);\r
+ subTreeMap.put(thresholdKey, newThresholdMap);\r
+ } else {\r
+ existingThresholdMap.put(subMapKey, actualMapValue);\r
+ }\r
+\r
+ }\r
+ }\r
+\r
+ return subTreeMap;\r
+\r
+ }\r
+\r
+\r
+ /**\r
+ * Provides a view of underlying map that filters out entries with keys starting with give prefix\r
+ *\r
+ * @param actualMap Target map that needs to be filtered\r
+ * @param keyNamePrefix key prefix\r
+ *\r
+ * @return a view of actual map which only show entries which have give prefix\r
+ */\r
+ public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,\r
+ final String keyNamePrefix) {\r
+ return Maps.filterKeys(actualMap,\r
+ new Predicate<String>() {\r
+ @Override\r
+ public boolean apply(@Nullable String key) {\r
+ return key != null && key.startsWith(keyNamePrefix);\r
+ }\r
+ });\r
+ }\r
+\r
+\r
+ /**\r
+ * Creates Quartz Scheduler\r
+ *\r
+ * @param pollingIntervalMS polling interval\r
+ * @param stdSchedulerFactory Quartz standard schedule factory instance\r
+ * @param quartzPublisherPropertiesFileName quartz properties file name\r
+ * @param jobDataMap job Data map\r
+ * @param quartzJobClass Quartz Job Class\r
+ * @param quartzJobName Quartz Job Name\r
+ * @param quartzTriggerName Quartz Trigger name\r
+ *\r
+ * @param <T> An implementation of Quartz {@link Job} interface\r
+ * @return Configured Quartz Scheduler\r
+ *\r
+ * @throws SchedulerException exception if unable to create to Quartz Scheduler\r
+ */\r
+ public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,\r
+ final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,\r
+ final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,\r
+ final String quartzTriggerName) throws SchedulerException {\r
+\r
+ // Initialize a new Quartz Standard scheduler\r
+ LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",\r
+ quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);\r
+ final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(\r
+ quartzPublisherPropertiesFileName, new Properties());\r
+ stdSchedulerFactory.initialize(quartzProperties);\r
+ final Scheduler scheduler = stdSchedulerFactory.getScheduler();\r
+\r
+ // Create a new job detail\r
+ final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,\r
+ AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();\r
+\r
+ // Create a new scheduling builder\r
+ final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()\r
+ .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule\r
+ .repeatForever(); // repeats while worker is running\r
+\r
+ // Create a trigger for the TCA Publisher Job\r
+ final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()\r
+ .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)\r
+ .startNow() // job starts right away\r
+ .withSchedule(simpleScheduleBuilder).build();\r
+\r
+ scheduler.scheduleJob(jobDetail, simpleTrigger);\r
+ LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());\r
+ return scheduler;\r
+ }\r
+\r
+\r
+ /**\r
+ * Does A&AI Enrichment for VM\r
+ *\r
+ * @param tcavesResponse Outgoing alert object\r
+ * @param aaiEnrichmentClient A&AI Enrichment client\r
+ * @param aaiVMEnrichmentAPIPath A&AI VM Enrichment API Path\r
+ * @param alertString alert String\r
+ * @param vmSourceName vm source name\r
+ */\r
+ public static void doAAIVMEnrichment(final TCAVESResponse tcavesResponse,\r
+ final AAIEnrichmentClient aaiEnrichmentClient,\r
+ final String aaiVMEnrichmentAPIPath,\r
+ final String alertString,\r
+ final String vmSourceName) {\r
+\r
+ final String filterString = "vserver-name:EQUALS:" + vmSourceName;\r
+ final ImmutableMap<String, String> queryParams = ImmutableMap.of(\r
+ "search-node-type", "vserver", "filter", filterString);\r
+\r
+ // fetch vm object resource Link from A&AI\r
+ final String vmAAIResourceLinkDetails = aaiEnrichmentClient.getEnrichmentDetails(\r
+ aaiVMEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());\r
+ final String vmObjectResourceLink = getVMObjectResourceLink(vmAAIResourceLinkDetails);\r
+\r
+ if (vmObjectResourceLink == null) {\r
+ LOG.warn("No A&AI Enrichment possible for alert message: {}.VM Object resource Link cannot be " +\r
+ "determined for vmSourceName: {}.", alertString, vmSourceName);\r
+ } else {\r
+\r
+ LOG.debug("Fetching VM A&AI Enrichment Details for VM Source Name: {}, Object resource Link: {}",\r
+ vmSourceName, vmObjectResourceLink);\r
+\r
+ // fetch vm A&AI Enrichment\r
+ final String vmEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(\r
+ vmObjectResourceLink, Collections.<String, String>emptyMap(), createAAIEnrichmentHeaders());\r
+\r
+ // enrich AAI\r
+ enrichAAI(tcavesResponse.getAai(), vmEnrichmentDetails, alertString,\r
+ AnalyticsConstants.AAI_VSERVER_KEY_PREFIX);\r
+ }\r
+\r
+\r
+ }\r
+\r
+\r
+ /**\r
+ * Does A&AI Enrichment for VNF\r
+ *\r
+ * @param tcavesResponse Outgoing alert object\r
+ * @param aaiEnrichmentClient A&AI Enrichment client\r
+ * @param aaiVNFEnrichmentAPIPath A&AI VNF Enrichment API Path\r
+ * @param alertString alert String\r
+ * @param vnfSourceName vnf source name\r
+ */\r
+ public static void doAAIVNFEnrichment(final TCAVESResponse tcavesResponse,\r
+ final AAIEnrichmentClient aaiEnrichmentClient,\r
+ final String aaiVNFEnrichmentAPIPath,\r
+ final String alertString,\r
+ final String vnfSourceName) {\r
+ final ImmutableMap<String, String> queryParams = ImmutableMap.of("vnf-name", vnfSourceName);\r
+\r
+ // fetch vnf A&AI Enrichment\r
+ final String vnfEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(\r
+ aaiVNFEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());\r
+\r
+ // enrich alert AAI\r
+ enrichAAI(tcavesResponse.getAai(), vnfEnrichmentDetails, alertString, AnalyticsConstants.AAI_VNF_KEY_PREFIX);\r
+ }\r
+\r
+ /**\r
+ * Fetches VM Object Resource Link from A&AI Resource Link Json\r
+ *\r
+ * @param vmAAIResourceLinkDetails VM Object Resource Link from A&AI Resource Link Json\r
+ *\r
+ * @return object resource link String\r
+ */\r
+ private static String getVMObjectResourceLink(final String vmAAIResourceLinkDetails) {\r
+ if (StringUtils.isNotBlank(vmAAIResourceLinkDetails)) {\r
+ try {\r
+ final JsonNode jsonNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(vmAAIResourceLinkDetails);\r
+ final JsonNode resourceLinkJsonNode = jsonNode.findPath("resource-link");\r
+ if (!resourceLinkJsonNode.isMissingNode()) {\r
+ return resourceLinkJsonNode.asText();\r
+ }\r
+ } catch (IOException e) {\r
+ LOG.warn("Unable to determine VM Object link inside AAI Resource Link Response JSON: {}. Exception: {}",\r
+ vmAAIResourceLinkDetails, e);\r
+ }\r
+ }\r
+ return null;\r
+ }\r
+\r
+ /**\r
+ * Creates Http Headers for A&AI Enrichment client\r
+ *\r
+ * @return Http Headers Map for A&AI Enrichment client\r
+ */\r
+ private static Map<String, String> createAAIEnrichmentHeaders() {\r
+ final Map<String, String> aaiEnrichmentHeaders = new LinkedHashMap<>();\r
+ final String transactionId = Long.toString(new Date().getTime());\r
+ aaiEnrichmentHeaders.put("X-FromAppId", "dcae-analytics-tca");\r
+ aaiEnrichmentHeaders.put("X-TransactionId", transactionId);\r
+ aaiEnrichmentHeaders.put("Accept", "application/json");\r
+ aaiEnrichmentHeaders.put("Real-Time", "true");\r
+ aaiEnrichmentHeaders.put("Content-Type", "application/json");\r
+ return aaiEnrichmentHeaders;\r
+ }\r
+\r
+\r
+ /**\r
+ * Populates A&AI details retrieved from A&AI Enrichment API into Alerts A&AI Object\r
+ *\r
+ * @param preEnrichmentAAI A&AI Alert object which needs to be populated with A&AI Enrichment Details\r
+ * @param aaiEnrichmentDetails A&AI Enrichment API fetched JSON String\r
+ * @param alertString Alert String\r
+ * @param keyPrefix Key prefix that needs to be added to each fetched A&AI Enrichment record\r
+ */\r
+ private static void enrichAAI(final AAI preEnrichmentAAI, final String aaiEnrichmentDetails,\r
+ final String alertString, final String keyPrefix) {\r
+\r
+ if (aaiEnrichmentDetails == null) {\r
+ LOG.warn("No A&AI Enrichment possible for AAI: {}. A&AI Enrichment details are absent." +\r
+ "Skipping Enrichment for alert message:{}", preEnrichmentAAI, alertString);\r
+\r
+ } else {\r
+\r
+ final AAI enrichmentDetailsAAI = getEnrichmentDetailsAAI(aaiEnrichmentDetails);\r
+\r
+ if (enrichmentDetailsAAI != null) {\r
+ final Set<Map.Entry<String, Object>> enrichedAAIEntrySet =\r
+ enrichmentDetailsAAI.getDynamicProperties().entrySet();\r
+ final Map<String, Object> preEnrichmentAAIDynamicProperties = preEnrichmentAAI.getDynamicProperties();\r
+\r
+ // populate A&AI Enrichment details and add prefix to key\r
+ for (Map.Entry<String, Object> enrichedAAIEntry : enrichedAAIEntrySet) {\r
+ preEnrichmentAAIDynamicProperties.put(keyPrefix + enrichedAAIEntry.getKey(),\r
+ enrichedAAIEntry.getValue());\r
+ }\r
+\r
+ LOG.debug("A&AI Enrichment was completed successfully for alert message: {}. Enriched AAI: {}",\r
+ alertString, preEnrichmentAAI);\r
+ } else {\r
+ LOG.warn("No A&AI Enrichment possible for AAI: {}. Invalid A&AI Response: {}." +\r
+ "Skipping Enrichment for alert message: {}",\r
+ preEnrichmentAAI, aaiEnrichmentDetails, alertString);\r
+ }\r
+ }\r
+\r
+ }\r
+\r
+ /**\r
+ * Creates a new A&AI object with only top level A&AI Enrichment details\r
+ *\r
+ * @param aaiEnrichmentDetails A&AI Enrichment details\r
+ *\r
+ * @return new A&AI with only top level A&AI Enrichment details\r
+ */\r
+ private static AAI getEnrichmentDetailsAAI(final String aaiEnrichmentDetails) {\r
+ try {\r
+ final JsonNode rootNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(aaiEnrichmentDetails);\r
+ final Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();\r
+ while (fieldsIterator.hasNext()) {\r
+ final Map.Entry<String, JsonNode> fieldEntry = fieldsIterator.next();\r
+ final JsonNode jsonNode = fieldEntry.getValue();\r
+ // remove all arrays, objects from A&AI Enrichment Json\r
+ if (jsonNode.isPojo() || jsonNode.isObject() || jsonNode.isArray()) {\r
+ fieldsIterator.remove();\r
+ }\r
+ }\r
+ return ANALYTICS_MODEL_OBJECT_MAPPER.treeToValue(rootNode, AAI.class);\r
+ } catch (IOException e) {\r
+ LOG.error("Failed to Parse AAI Enrichment Details from JSON: {}, Exception: {}.", aaiEnrichmentDetails, e);\r
+ }\r
+ return null;\r
+ }\r
+\r
+}\r