2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.onap.dcae.apod.analytics.tca.utils;
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.fasterxml.jackson.databind.JsonNode;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Predicate;
28 import com.google.common.base.Predicates;
29 import com.google.common.base.Supplier;
30 import com.google.common.base.Suppliers;
31 import com.google.common.collect.HashBasedTable;
32 import com.google.common.collect.ImmutableList;
33 import com.google.common.collect.ImmutableMap;
34 import com.google.common.collect.Iterables;
35 import com.google.common.collect.Lists;
36 import com.google.common.collect.Maps;
37 import com.google.common.collect.Table;
38 import com.jayway.jsonpath.DocumentContext;
39 import com.jayway.jsonpath.JsonPath;
40 import com.jayway.jsonpath.TypeRef;
41 import org.apache.commons.lang3.StringUtils;
42 import org.apache.commons.lang3.tuple.ImmutablePair;
43 import org.apache.commons.lang3.tuple.Pair;
44 import org.onap.dcae.apod.analytics.aai.service.AAIEnrichmentClient;
45 import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
46 import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
47 import org.onap.dcae.apod.analytics.common.exception.MessageProcessingException;
48 import org.onap.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
49 import org.onap.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;
50 import org.onap.dcae.apod.analytics.model.domain.cef.AlertAction;
51 import org.onap.dcae.apod.analytics.model.domain.cef.AlertType;
52 import org.onap.dcae.apod.analytics.model.domain.cef.CommonEventHeader;
53 import org.onap.dcae.apod.analytics.model.domain.cef.Criticality;
54 import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
55 import org.onap.dcae.apod.analytics.model.domain.cef.EventSeverity;
56 import org.onap.dcae.apod.analytics.model.domain.cef.PerformanceCounter;
57 import org.onap.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;
58 import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
59 import org.onap.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;
60 import org.onap.dcae.apod.analytics.model.domain.policy.tca.Direction;
61 import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
62 import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
63 import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
64 import org.onap.dcae.apod.analytics.model.facade.tca.AAI;
65 import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
66 import org.onap.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;
67 import org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
68 import org.onap.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
69 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
70 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
71 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
72 import org.onap.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
73 import org.quartz.Job;
74 import org.quartz.JobBuilder;
75 import org.quartz.JobDataMap;
76 import org.quartz.JobDetail;
77 import org.quartz.Scheduler;
78 import org.quartz.SchedulerException;
79 import org.quartz.SimpleScheduleBuilder;
80 import org.quartz.SimpleTrigger;
81 import org.quartz.TriggerBuilder;
82 import org.quartz.impl.StdSchedulerFactory;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
86 import java.io.IOException;
87 import java.math.BigDecimal;
88 import java.util.ArrayList;
89 import java.util.Collections;
90 import java.util.Comparator;
91 import java.util.Date;
92 import java.util.HashMap;
93 import java.util.Iterator;
94 import java.util.LinkedHashMap;
95 import java.util.LinkedList;
96 import java.util.List;
98 import java.util.Properties;
100 import java.util.SortedMap;
101 import java.util.TreeMap;
102 import java.util.UUID;
104 import javax.annotation.Nonnull;
105 import javax.annotation.Nullable;
107 import static com.google.common.collect.Lists.newArrayList;
108 import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;
111 * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get
112 * pre configured Json Object Mapper understand serialization and deserialization of CEF Message
115 * @author Rajiv Singla . Creation Date: 10/24/2016.
117 public abstract class TCAUtils extends AnalyticsModelJsonUtils {
119 private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);
122 * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,
125 private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {
127 public int compare(Threshold threshold1, Threshold threshold2) {
128 return threshold1.getSeverity().compareTo(threshold2.getSeverity());
133 * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy}
135 * @return TCA Policy Metrics Per Event Name list
137 public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() {
138 return new Function<TCAPolicy, List<MetricsPerEventName>>() {
141 public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) {
142 return tcaPolicy.getMetricsPerEventName();
148 * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from
149 * {@link MetricsPerEventName}
151 * @return Event Names or a Metrics Per Event Name object
153 public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() {
154 return new Function<MetricsPerEventName, String>() {
156 public String apply(@Nonnull MetricsPerEventName metricsPerEventName) {
157 return metricsPerEventName.getEventName();
164 * Extracts {@link TCAPolicy} Event Names
166 * @param tcaPolicy TCA Policy
167 * @return List of event names in the TCA Policy
169 public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) {
170 final List<MetricsPerEventName> metricsPerEventNames =
171 tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);
173 return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction());
177 * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to
178 * change during runtime
180 * @param tcaPolicy TCA Policy
181 * @return a Supplier that memoize the TCA Policy event names
183 public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) {
184 return Suppliers.memoize(new Supplier<List<String>>() {
186 public List<String> get() {
187 return getPolicyEventNames(tcaPolicy);
194 * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path
196 * @param tcaPolicy TCA Policy
197 * @return A table with Keys of event name and field path containing List of threshold as values
199 public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) {
200 final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();
201 for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) {
202 final String eventName = metricsPerEventName.getEventName();
203 final List<Threshold> thresholds = metricsPerEventName.getThresholds();
204 for (Threshold threshold : thresholds) {
205 final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath());
206 if (existingThresholds == null) {
207 final LinkedList<Threshold> newThresholdList = new LinkedList<>();
208 newThresholdList.add(threshold);
209 domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList);
211 domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold);
215 return domainFRTable;
220 * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table
222 * @param tcaPolicy TCA Policy
223 * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values
225 public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier
226 (final TCAPolicy tcaPolicy) {
227 return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {
229 public Table<String, String, List<Threshold>> get() {
230 return getPolicyEventNameThresholdsTable(tcaPolicy);
237 * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},
238 * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to
239 * filter out messages which does not match policy domain or event Name
241 * @param cefMessage CEF Message
242 * @param tcaPolicy TCA Policy
243 * @return Message Process Context after processing filter chain
245 public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,
246 @Nonnull final TCAPolicy tcaPolicy) {
248 final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();
249 final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();
250 final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter();
251 // Create a list of message processors
252 final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =
253 ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter);
254 final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);
255 // Create a message processors chain
256 final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =
257 new GenericMessageChainProcessor<>(messageProcessors, processorContext);
259 return tcaProcessingChain.processChain();
264 * Extracts json path values for given json Field Paths from using Json path notation. Assumes
265 * that values extracted are always long
267 * @param message CEF Message
268 * @param jsonFieldPaths Json Field Paths
269 * @return Map containing key as json path and values as values associated with that json path
271 public static Map<String, List<BigDecimal>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>
274 final Map<String, List<BigDecimal>> jsonFieldPathMap = new HashMap<>();
275 final DocumentContext documentContext = JsonPath.parse(message);
277 for (String jsonFieldPath : jsonFieldPaths) {
278 List<BigDecimal> jsonFieldValues = null;
281 jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<BigDecimal>>() {
283 } catch (Exception e) {
284 final String errorMessage = String.format(
285 "Unable to convert jsonFieldPath: %s value to valid number. " +
286 "Json Path value is not in a valid number format. Incoming message: %s",
287 jsonFieldPath, message);
288 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
290 // If Json Field Values are not or empty
291 if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
292 // Filter out all null values in the filed values list
293 final List<BigDecimal> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,
294 Predicates.<BigDecimal>notNull()));
295 // If there are non null values put them in the map
296 if (!nonNullValues.isEmpty()) {
297 jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
302 return jsonFieldPathMap;
306 * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
307 * it applies threshold in order of their severity and record the first threshold per message field path
309 * @param messageFieldValues Field Path Values extracted from CEF Message
310 * @param fieldThresholds Policy Thresholds for Field Path
311 * @return Optional of violated threshold for a field path
313 public static Optional<Threshold> thresholdCalculator(final List<BigDecimal> messageFieldValues, final
316 // order thresholds by severity
317 Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);
318 // Now apply each threshold to field values
319 for (Threshold fieldThreshold : fieldThresholds) {
320 for (BigDecimal messageFieldValue : messageFieldValues) {
321 final Boolean isThresholdViolated =
322 fieldThreshold.getDirection().operate(messageFieldValue, new BigDecimal(fieldThreshold
323 .getThresholdValue()));
324 if (isThresholdViolated) {
325 final Threshold violatedThreshold = Threshold.copy(fieldThreshold);
326 violatedThreshold.setActualFieldValue(messageFieldValue);
327 return Optional.of(violatedThreshold);
331 return Optional.absent();
335 * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.
336 * Grabs first highest priority violated threshold
338 * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds
339 * @return First Highest priority violated threshold
341 public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {
343 final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());
345 if (violatedThresholds.size() == 1) {
346 return violatedThresholds.get(0);
348 Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);
349 // Just grab the first violated threshold with highest priority
350 return violatedThresholds.get(0);
355 * Creates {@link MetricsPerEventName} object which contains violated thresholds
357 * @param tcaPolicy TCA Policy
358 * @param violatedThreshold Violated thresholds
359 * @param eventName Event Name
361 * @return MetricsPerEventName object containing one highest severity violated threshold
363 public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,
364 @Nonnull final Threshold violatedThreshold,
365 @Nonnull final String eventName) {
367 final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList(
368 Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() {
370 public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) {
371 return metricsPerEventName.getEventName().equals(eventName);
374 // TCA policy must have only one metrics per event Name
375 if (metricsPerEventNames.size() == 1) {
376 final MetricsPerEventName violatedMetrics =
377 MetricsPerEventName.copy(metricsPerEventNames.get(0));
378 violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));
379 return violatedMetrics;
381 final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName);
382 throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
387 * Computes threshold violations
389 * @param processorContext Filtered processor Context
390 * @return processor context with any threshold violations
392 public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {
393 final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();
394 return policyThresholdsProcessor.apply(processorContext);
399 * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}
402 * @param processorContextWithViolations processor context which has TCA violations
403 * @param tcaAppName tca app name
404 * @param isAlertInCEFFormat determines if output alert is in CEF format
406 * @return TCA Alert String
408 * @throws JsonProcessingException If alert cannot be parsed into JSON String
410 public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,
411 final String tcaAppName,
412 final Boolean isAlertInCEFFormat) throws JsonProcessingException {
413 if (isAlertInCEFFormat != null && isAlertInCEFFormat) {
414 final EventListener eventListenerWithViolations =
415 addThresholdViolationFields(processorContextWithViolations);
416 final String alertString = writeValueAsString(eventListenerWithViolations);
417 LOG.debug("Created alert in CEF Format: {}", alertString);
420 final TCAVESResponse newTCAVESResponse =
421 createNewTCAVESResponse(processorContextWithViolations, tcaAppName);
422 final String alertString = writeValueAsString(newTCAVESResponse);
423 LOG.debug("Created alert in Non CEF Format: {}", alertString);
429 * Adds threshold violation fields to {@link EventListener}
431 * @param processorContextWithViolations processor context that contains violations
432 * @return event listener with threshold crossing alert fields populated
434 public static EventListener addThresholdViolationFields(
435 final TCACEFProcessorContext processorContextWithViolations) {
437 final MetricsPerEventName metricsPerEventName =
438 processorContextWithViolations.getMetricsPerEventName();
439 // confirm violations are indeed present
440 if (metricsPerEventName == null) {
441 final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";
442 throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
445 // get violated threshold
446 final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
447 final EventListener eventListener = processorContextWithViolations.getCEFEventListener();
448 final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
450 // create new threshold crossing alert fields
451 final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();
452 thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());
453 thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());
454 thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));
455 thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);
456 thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);
457 thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());
458 thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());
459 thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName());
461 // create new performance count
462 final PerformanceCounter performanceCounter = new PerformanceCounter();
463 performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));
464 performanceCounter.setName(violatedThreshold.getFieldPath());
465 performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());
466 performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());
468 // set additional parameters for threshold crossing alert fields
469 thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));
471 // add threshold crossing fields to existing event listener
472 eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);
474 return eventListener;
478 * Converts {@link EventSeverity} to {@link Criticality}
480 * @param eventSeverity event severity
482 * @return performance counter criticality
484 private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {
485 switch (eventSeverity) {
487 return Criticality.CRIT;
489 return Criticality.MAJ;
491 return Criticality.UNKNOWN;
496 * Creates {@link TCAVESResponse} object
498 * @param processorContext processor Context with violations
499 * @param tcaAppName TCA App Name
501 * @return TCA VES Response Message
503 public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,
504 final String tcaAppName) {
506 final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
507 // confirm violations are indeed present
508 if (metricsPerEventName == null) {
509 final String errorMessage = "No violations metrics. Unable to create VES Response";
510 throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
513 final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
514 final EventListener eventListener = processorContext.getCEFEventListener();
515 final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
517 final TCAVESResponse tcavesResponse = new TCAVESResponse();
518 // ClosedLoopControlName included in the DCAE configuration Policy
519 tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());
520 // version included in the DCAE configuration Policy
521 tcavesResponse.setVersion(violatedThreshold.getVersion());
522 // Generate a UUID for this output message
523 tcavesResponse.setRequestID(UUID.randomUUID().toString());
524 // commonEventHeader.startEpochMicrosec from the received VES message
525 tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());
526 // commonEventHeader.lastEpochMicrosec from the received VES message for abated alerts
527 if (violatedThreshold.getClosedLoopEventStatus() == ClosedLoopEventStatus.ABATED) {
528 tcavesResponse.setClosedLoopAlarmEnd(commonEventHeader.getLastEpochMicrosec());
530 // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot
531 tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);
533 final AAI aai = new AAI();
534 tcavesResponse.setAai(aai);
536 // VM specific settings
537 if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) {
539 tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE);
540 // Hard Coded - "vserver.vserver-name"
541 tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET);
542 // commonEventHeader.sourceName from the received VES message
543 aai.setGenericServerName(commonEventHeader.getSourceName());
545 // VNF specific settings
546 // Hard Coded - "VNF"
547 tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE);
548 // Hard Coded - "generic-vnf.vnf-name"
549 tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET);
550 // commonEventHeader.sourceName from the received VES message
551 aai.setGenericVNFName(commonEventHeader.getSourceName());
554 // Hard Coded - "DCAE"
555 tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);
556 // policyScope included in the DCAE configuration Policy
557 tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope());
558 // policyName included in the DCAE configuration Policy
559 tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName());
560 // policyVersion included in the DCAE configuration Policy
561 tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion());
562 // Extracted from violated threshold
563 tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name());
565 return tcavesResponse;
570 * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert
572 * @param tcavesResponse alert
574 * @return control Loop Schema Type
576 public static ControlLoopSchemaType determineControlLoopSchemaType(final TCAVESResponse tcavesResponse) {
577 final AAI aai = tcavesResponse.getAai();
578 if (aai.getGenericServerName() != null) {
579 return ControlLoopSchemaType.VM;
581 return ControlLoopSchemaType.VNF;
586 * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert
588 * @param tcavesResponse {@link TCAVESResponse} TCA alert
590 * @return Source name
592 public static String determineSourceName(final TCAVESResponse tcavesResponse) {
593 final AAI aai = tcavesResponse.getAai();
594 if (aai.getGenericServerName() != null) {
595 return aai.getGenericServerName();
597 return aai.getGenericVNFName();
603 * Extract Domain and Event Name from processor context if present
605 * @param processorContext processor context
606 * @return Tuple of domain and event Name
608 public static Pair<String, String> getDomainAndEventName(
609 @Nullable final TCACEFProcessorContext processorContext) {
611 String domain = null;
612 String eventName = null;
614 if (processorContext != null &&
615 processorContext.getCEFEventListener() != null &&
616 processorContext.getCEFEventListener().getEvent() != null &&
617 processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {
618 final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()
619 .getCommonEventHeader();
621 if (commonEventHeader.getDomain() != null) {
622 domain = commonEventHeader.getDomain().name();
625 if (commonEventHeader.getEventName() != null) {
626 eventName = commonEventHeader.getEventName();
631 return new ImmutablePair<>(domain, eventName);
636 * Creates {@link TCAPolicy} Metrics per Event Name list
638 * @param eventNamesMap Map containing event Name as key and corresponding values
640 * @return List of {@link MetricsPerEventName}
642 public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList(
643 final Map<String, Map<String, String>> eventNamesMap) {
645 // create a new metrics per event Name list
646 final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>();
648 for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) {
650 // create new metrics per event Name instance
651 final MetricsPerEventName newMetricsPerEventName =
652 createNewMetricsPerEventName(eventNamesEntry);
653 metricsPerEventNames.add(newMetricsPerEventName);
655 // determine all threshold related values
656 final Map<String, String> thresholdsValuesMaps =
657 filterMapByKeyNamePrefix(eventNamesEntry.getValue(),
658 AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);
660 // create a map of all threshold values
661 final Map<String, Map<String, String>> thresholdsMap =
662 extractSubTree(thresholdsValuesMaps, 1, 2,
663 AnalyticsConstants.TCA_POLICY_DELIMITER);
665 // add thresholds to nmetrics per event Names threshold list
666 for (Map<String, String> thresholdMap : thresholdsMap.values()) {
667 newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap));
672 return metricsPerEventNames;
676 * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap
678 * @param thresholdMap threshold map with threshold values
680 * @return new instance of TCA Policy Threshold
682 public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {
683 final Threshold threshold = new Threshold();
684 threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));
685 threshold.setVersion(thresholdMap.get("policy.version"));
686 threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));
687 threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));
688 threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));
689 threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));
690 threshold.setClosedLoopEventStatus(
691 ClosedLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus")));
696 * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope
697 * extracted from given eventNamesEntry
699 * @param eventNamesEntry Event Names Entry
701 * @return new instance of MetricsPerEventName
703 public static MetricsPerEventName createNewMetricsPerEventName(
704 final Map.Entry<String, Map<String, String>> eventNamesEntry) {
705 // determine event Name
706 final String eventName = eventNamesEntry.getKey();
707 // determine event Name thresholds
708 final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue();
709 final MetricsPerEventName metricsPerEventName = new MetricsPerEventName();
710 final List<Threshold> thresholds = new LinkedList<>();
711 metricsPerEventName.setThresholds(thresholds);
712 metricsPerEventName.setEventName(eventName);
713 // bind policyName, policyVersion, policyScope and closedLoopControlName
714 metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName"));
715 metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion"));
716 metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope"));
717 metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf(
718 metricsPerEventNameThresholdsMap.get("controlLoopSchemaType")));
719 return metricsPerEventName;
723 * Converts a flattened key/value map which has keys delimited by a given delimiter.
724 * The start Index and end index extract the sub-key value and returns a new map containing
725 * sub-keys and values.
727 * @param actualMap actual Map
728 * @param startIndex start index
729 * @param endIndex end index
730 * @param delimiter delimiter
732 * @return Map with new sub tree map
734 public static Map<String, Map<String, String>> extractSubTree(
735 final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {
737 final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();
739 // iterate over actual map entries
740 for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {
741 final String actualMapKey = actualMapEntry.getKey();
742 final String actualMapValue = actualMapEntry.getValue();
744 // determine delimiter start and end index
745 final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);
746 final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);
747 final int keyLength = actualMapKey.length();
749 // extract sub-tree map
750 if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {
751 final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);
752 final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);
753 final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);
754 if (existingThresholdMap == null) {
755 Map<String, String> newThresholdMap = new LinkedHashMap<>();
756 newThresholdMap.put(subMapKey, actualMapValue);
757 subTreeMap.put(thresholdKey, newThresholdMap);
759 existingThresholdMap.put(subMapKey, actualMapValue);
771 * Provides a view of underlying map that filters out entries with keys starting with give prefix
773 * @param actualMap Target map that needs to be filtered
774 * @param keyNamePrefix key prefix
776 * @return a view of actual map which only show entries which have give prefix
778 public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,
779 final String keyNamePrefix) {
780 return Maps.filterKeys(actualMap,
781 new Predicate<String>() {
783 public boolean apply(@Nullable String key) {
784 return key != null && key.startsWith(keyNamePrefix);
791 * Creates Quartz Scheduler
793 * @param pollingIntervalMS polling interval
794 * @param stdSchedulerFactory Quartz standard schedule factory instance
795 * @param quartzPublisherPropertiesFileName quartz properties file name
796 * @param jobDataMap job Data map
797 * @param quartzJobClass Quartz Job Class
798 * @param quartzJobName Quartz Job Name
799 * @param quartzTriggerName Quartz Trigger name
801 * @param <T> An implementation of Quartz {@link Job} interface
802 * @return Configured Quartz Scheduler
804 * @throws SchedulerException exception if unable to create to Quartz Scheduler
806 public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,
807 final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,
808 final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,
809 final String quartzTriggerName) throws SchedulerException {
811 // Initialize a new Quartz Standard scheduler
812 LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",
813 quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);
814 final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(
815 quartzPublisherPropertiesFileName, new Properties());
816 stdSchedulerFactory.initialize(quartzProperties);
817 final Scheduler scheduler = stdSchedulerFactory.getScheduler();
819 // Create a new job detail
820 final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,
821 AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();
823 // Create a new scheduling builder
824 final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
825 .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule
826 .repeatForever(); // repeats while worker is running
828 // Create a trigger for the TCA Publisher Job
829 final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()
830 .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)
831 .startNow() // job starts right away
832 .withSchedule(simpleScheduleBuilder).build();
834 scheduler.scheduleJob(jobDetail, simpleTrigger);
835 LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());
841 * Does A&AI Enrichment for VM
843 * @param tcavesResponse Outgoing alert object
844 * @param aaiEnrichmentClient A&AI Enrichment client
845 * @param aaiVMEnrichmentAPIPath A&AI VM Enrichment API Path
846 * @param alertString alert String
847 * @param vmSourceName vm source name
849 public static void doAAIVMEnrichment(final TCAVESResponse tcavesResponse,
850 final AAIEnrichmentClient aaiEnrichmentClient,
851 final String aaiVMEnrichmentAPIPath,
852 final String alertString,
853 final String vmSourceName) {
855 final String filterString = "vserver-name:EQUALS:" + vmSourceName;
856 final ImmutableMap<String, String> queryParams = ImmutableMap.of(
857 "search-node-type", "vserver", "filter", filterString);
859 // fetch vm object resource Link from A&AI
860 final String vmAAIResourceLinkDetails = aaiEnrichmentClient.getEnrichmentDetails(
861 aaiVMEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());
862 final String vmObjectResourceLink = getVMObjectResourceLink(vmAAIResourceLinkDetails);
864 if (vmObjectResourceLink == null) {
865 LOG.warn("No A&AI Enrichment possible for alert message: {}.VM Object resource Link cannot be " +
866 "determined for vmSourceName: {}.", alertString, vmSourceName);
869 LOG.debug("Fetching VM A&AI Enrichment Details for VM Source Name: {}, Object resource Link: {}",
870 vmSourceName, vmObjectResourceLink);
872 // fetch vm A&AI Enrichment
873 final String vmEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(
874 vmObjectResourceLink, Collections.<String, String>emptyMap(), createAAIEnrichmentHeaders());
877 enrichAAI(tcavesResponse.getAai(), vmEnrichmentDetails, alertString,
878 AnalyticsConstants.AAI_VSERVER_KEY_PREFIX);
886 * Does A&AI Enrichment for VNF
888 * @param tcavesResponse Outgoing alert object
889 * @param aaiEnrichmentClient A&AI Enrichment client
890 * @param aaiVNFEnrichmentAPIPath A&AI VNF Enrichment API Path
891 * @param alertString alert String
892 * @param vnfSourceName vnf source name
894 public static void doAAIVNFEnrichment(final TCAVESResponse tcavesResponse,
895 final AAIEnrichmentClient aaiEnrichmentClient,
896 final String aaiVNFEnrichmentAPIPath,
897 final String alertString,
898 final String vnfSourceName) {
899 final ImmutableMap<String, String> queryParams = ImmutableMap.of("vnf-name", vnfSourceName);
901 // fetch vnf A&AI Enrichment
902 final String vnfEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(
903 aaiVNFEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());
906 enrichAAI(tcavesResponse.getAai(), vnfEnrichmentDetails, alertString, AnalyticsConstants.AAI_VNF_KEY_PREFIX);
910 * Fetches VM Object Resource Link from A&AI Resource Link Json
912 * @param vmAAIResourceLinkDetails VM Object Resource Link from A&AI Resource Link Json
914 * @return object resource link String
916 private static String getVMObjectResourceLink(final String vmAAIResourceLinkDetails) {
917 if (StringUtils.isNotBlank(vmAAIResourceLinkDetails)) {
919 final JsonNode jsonNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(vmAAIResourceLinkDetails);
920 final JsonNode resourceLinkJsonNode = jsonNode.findPath("resource-link");
921 if (!resourceLinkJsonNode.isMissingNode()) {
922 return resourceLinkJsonNode.asText();
924 } catch (IOException e) {
925 LOG.warn("Unable to determine VM Object link inside AAI Resource Link Response JSON: {}. Exception: {}",
926 vmAAIResourceLinkDetails, e);
933 * Creates Http Headers for A&AI Enrichment client
935 * @return Http Headers Map for A&AI Enrichment client
937 private static Map<String, String> createAAIEnrichmentHeaders() {
938 final Map<String, String> aaiEnrichmentHeaders = new LinkedHashMap<>();
939 final String transactionId = Long.toString(new Date().getTime());
940 aaiEnrichmentHeaders.put("X-FromAppId", "dcae-analytics-tca");
941 aaiEnrichmentHeaders.put("X-TransactionId", transactionId);
942 aaiEnrichmentHeaders.put("Accept", "application/json");
943 aaiEnrichmentHeaders.put("Real-Time", "true");
944 aaiEnrichmentHeaders.put("Content-Type", "application/json");
945 return aaiEnrichmentHeaders;
950 * Populates A&AI details retrieved from A&AI Enrichment API into Alerts A&AI Object
952 * @param preEnrichmentAAI A&AI Alert object which needs to be populated with A&AI Enrichment Details
953 * @param aaiEnrichmentDetails A&AI Enrichment API fetched JSON String
954 * @param alertString Alert String
955 * @param keyPrefix Key prefix that needs to be added to each fetched A&AI Enrichment record
957 private static void enrichAAI(final AAI preEnrichmentAAI, final String aaiEnrichmentDetails,
958 final String alertString, final String keyPrefix) {
960 if (aaiEnrichmentDetails == null) {
961 LOG.warn("No A&AI Enrichment possible for AAI: {}. A&AI Enrichment details are absent." +
962 "Skipping Enrichment for alert message:{}", preEnrichmentAAI, alertString);
966 final AAI enrichmentDetailsAAI = getEnrichmentDetailsAAI(aaiEnrichmentDetails);
968 if (enrichmentDetailsAAI != null) {
969 final Set<Map.Entry<String, Object>> enrichedAAIEntrySet =
970 enrichmentDetailsAAI.getDynamicProperties().entrySet();
971 final Map<String, Object> preEnrichmentAAIDynamicProperties = preEnrichmentAAI.getDynamicProperties();
973 // populate A&AI Enrichment details and add prefix to key
974 for (Map.Entry<String, Object> enrichedAAIEntry : enrichedAAIEntrySet) {
975 preEnrichmentAAIDynamicProperties.put(keyPrefix + enrichedAAIEntry.getKey(),
976 enrichedAAIEntry.getValue());
979 LOG.debug("A&AI Enrichment was completed successfully for alert message: {}. Enriched AAI: {}",
980 alertString, preEnrichmentAAI);
982 LOG.warn("No A&AI Enrichment possible for AAI: {}. Invalid A&AI Response: {}." +
983 "Skipping Enrichment for alert message: {}",
984 preEnrichmentAAI, aaiEnrichmentDetails, alertString);
991 * Creates a new A&AI object with only top level A&AI Enrichment details
993 * @param aaiEnrichmentDetails A&AI Enrichment details
995 * @return new A&AI with only top level A&AI Enrichment details
997 private static AAI getEnrichmentDetailsAAI(final String aaiEnrichmentDetails) {
999 final JsonNode rootNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(aaiEnrichmentDetails);
1000 final Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();
1001 while (fieldsIterator.hasNext()) {
1002 final Map.Entry<String, JsonNode> fieldEntry = fieldsIterator.next();
1003 final JsonNode jsonNode = fieldEntry.getValue();
1004 // remove all arrays, objects from A&AI Enrichment Json
1005 if (jsonNode.isPojo() || jsonNode.isObject() || jsonNode.isArray()) {
1006 fieldsIterator.remove();
1009 return ANALYTICS_MODEL_OBJECT_MAPPER.treeToValue(rootNode, AAI.class);
1010 } catch (IOException e) {
1011 LOG.error("Failed to Parse AAI Enrichment Details from JSON: {}, Exception: {}.", aaiEnrichmentDetails, e);