2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.tca.utils;
\r
23 import com.fasterxml.jackson.core.JsonProcessingException;
\r
24 import com.fasterxml.jackson.databind.JsonNode;
\r
25 import com.google.common.base.Function;
\r
26 import com.google.common.base.Optional;
\r
27 import com.google.common.base.Predicate;
\r
28 import com.google.common.base.Predicates;
\r
29 import com.google.common.base.Supplier;
\r
30 import com.google.common.base.Suppliers;
\r
31 import com.google.common.collect.HashBasedTable;
\r
32 import com.google.common.collect.ImmutableList;
\r
33 import com.google.common.collect.ImmutableMap;
\r
34 import com.google.common.collect.Iterables;
\r
35 import com.google.common.collect.Lists;
\r
36 import com.google.common.collect.Maps;
\r
37 import com.google.common.collect.Table;
\r
38 import com.jayway.jsonpath.DocumentContext;
\r
39 import com.jayway.jsonpath.JsonPath;
\r
40 import com.jayway.jsonpath.TypeRef;
\r
41 import org.apache.commons.lang3.StringUtils;
\r
42 import org.apache.commons.lang3.tuple.ImmutablePair;
\r
43 import org.apache.commons.lang3.tuple.Pair;
\r
44 import org.openecomp.dcae.apod.analytics.aai.service.AAIEnrichmentClient;
\r
45 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
\r
46 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
\r
47 import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
\r
48 import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
\r
49 import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;
\r
50 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction;
\r
51 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType;
\r
52 import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader;
\r
53 import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality;
\r
54 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
\r
55 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;
\r
56 import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter;
\r
57 import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;
\r
58 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
\r
59 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;
\r
60 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;
\r
61 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
\r
62 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
\r
63 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
\r
64 import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI;
\r
65 import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
\r
66 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;
\r
67 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
\r
68 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
\r
69 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
\r
70 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
\r
71 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
\r
72 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
\r
73 import org.quartz.Job;
\r
74 import org.quartz.JobBuilder;
\r
75 import org.quartz.JobDataMap;
\r
76 import org.quartz.JobDetail;
\r
77 import org.quartz.Scheduler;
\r
78 import org.quartz.SchedulerException;
\r
79 import org.quartz.SimpleScheduleBuilder;
\r
80 import org.quartz.SimpleTrigger;
\r
81 import org.quartz.TriggerBuilder;
\r
82 import org.quartz.impl.StdSchedulerFactory;
\r
83 import org.slf4j.Logger;
\r
84 import org.slf4j.LoggerFactory;
\r
86 import java.io.IOException;
\r
87 import java.math.BigDecimal;
\r
88 import java.util.ArrayList;
\r
89 import java.util.Collections;
\r
90 import java.util.Comparator;
\r
91 import java.util.Date;
\r
92 import java.util.HashMap;
\r
93 import java.util.Iterator;
\r
94 import java.util.LinkedHashMap;
\r
95 import java.util.LinkedList;
\r
96 import java.util.List;
\r
97 import java.util.Map;
\r
98 import java.util.Properties;
\r
99 import java.util.Set;
\r
100 import java.util.SortedMap;
\r
101 import java.util.TreeMap;
\r
102 import java.util.UUID;
\r
104 import javax.annotation.Nonnull;
\r
105 import javax.annotation.Nullable;
\r
107 import static com.google.common.collect.Lists.newArrayList;
\r
108 import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;
\r
111 * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get
\r
112 * pre configured Json Object Mapper understand serialization and deserialization of CEF Message
\r
115 * @author Rajiv Singla . Creation Date: 10/24/2016.
\r
117 public abstract class TCAUtils extends AnalyticsModelJsonUtils {
\r
119 private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);
\r
122 * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,
\r
125 private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {
\r
127 public int compare(Threshold threshold1, Threshold threshold2) {
\r
128 return threshold1.getSeverity().compareTo(threshold2.getSeverity());
\r
133 * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy}
\r
135 * @return TCA Policy Metrics Per Event Name list
\r
137 public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() {
\r
138 return new Function<TCAPolicy, List<MetricsPerEventName>>() {
\r
141 public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) {
\r
142 return tcaPolicy.getMetricsPerEventName();
\r
148 * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from
\r
149 * {@link MetricsPerEventName}
\r
151 * @return Event Names or a Metrics Per Event Name object
\r
153 public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() {
\r
154 return new Function<MetricsPerEventName, String>() {
\r
156 public String apply(@Nonnull MetricsPerEventName metricsPerEventName) {
\r
157 return metricsPerEventName.getEventName();
\r
164 * Extracts {@link TCAPolicy} Event Names
\r
166 * @param tcaPolicy TCA Policy
\r
167 * @return List of event names in the TCA Policy
\r
169 public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) {
\r
170 final List<MetricsPerEventName> metricsPerEventNames =
\r
171 tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);
\r
173 return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction());
\r
177 * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to
\r
178 * change during runtime
\r
180 * @param tcaPolicy TCA Policy
\r
181 * @return a Supplier that memoize the TCA Policy event names
\r
183 public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) {
\r
184 return Suppliers.memoize(new Supplier<List<String>>() {
\r
186 public List<String> get() {
\r
187 return getPolicyEventNames(tcaPolicy);
\r
194 * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path
\r
196 * @param tcaPolicy TCA Policy
\r
197 * @return A table with Keys of event name and field path containing List of threshold as values
\r
199 public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) {
\r
200 final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();
\r
201 for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) {
\r
202 final String eventName = metricsPerEventName.getEventName();
\r
203 final List<Threshold> thresholds = metricsPerEventName.getThresholds();
\r
204 for (Threshold threshold : thresholds) {
\r
205 final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath());
\r
206 if (existingThresholds == null) {
\r
207 final LinkedList<Threshold> newThresholdList = new LinkedList<>();
\r
208 newThresholdList.add(threshold);
\r
209 domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList);
\r
211 domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold);
\r
215 return domainFRTable;
\r
220 * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table
\r
222 * @param tcaPolicy TCA Policy
\r
223 * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values
\r
225 public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier
\r
226 (final TCAPolicy tcaPolicy) {
\r
227 return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {
\r
229 public Table<String, String, List<Threshold>> get() {
\r
230 return getPolicyEventNameThresholdsTable(tcaPolicy);
\r
237 * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},
\r
238 * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to
\r
239 * filter out messages which does not match policy domain or event Name
\r
241 * @param cefMessage CEF Message
\r
242 * @param tcaPolicy TCA Policy
\r
243 * @return Message Process Context after processing filter chain
\r
245 public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,
\r
246 @Nonnull final TCAPolicy tcaPolicy) {
\r
248 final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();
\r
249 final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();
\r
250 final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter();
\r
251 // Create a list of message processors
\r
252 final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =
\r
253 ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter);
\r
254 final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);
\r
255 // Create a message processors chain
\r
256 final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =
\r
257 new GenericMessageChainProcessor<>(messageProcessors, processorContext);
\r
259 return tcaProcessingChain.processChain();
\r
264 * Extracts json path values for given json Field Paths from using Json path notation. Assumes
\r
265 * that values extracted are always long
\r
267 * @param message CEF Message
\r
268 * @param jsonFieldPaths Json Field Paths
\r
269 * @return Map containing key as json path and values as values associated with that json path
\r
271 public static Map<String, List<BigDecimal>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>
\r
274 final Map<String, List<BigDecimal>> jsonFieldPathMap = new HashMap<>();
\r
275 final DocumentContext documentContext = JsonPath.parse(message);
\r
277 for (String jsonFieldPath : jsonFieldPaths) {
\r
278 List<BigDecimal> jsonFieldValues = null;
\r
281 jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<BigDecimal>>() {
\r
283 } catch (Exception e) {
\r
284 final String errorMessage = String.format(
\r
285 "Unable to convert jsonFieldPath: %s value to valid number. " +
\r
286 "Json Path value is not in a valid number format. Incoming message: %s",
\r
287 jsonFieldPath, message);
\r
288 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
290 // If Json Field Values are not or empty
\r
291 if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
\r
292 // Filter out all null values in the filed values list
\r
293 final List<BigDecimal> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,
\r
294 Predicates.<BigDecimal>notNull()));
\r
295 // If there are non null values put them in the map
\r
296 if (!nonNullValues.isEmpty()) {
\r
297 jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
\r
302 return jsonFieldPathMap;
\r
306 * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
\r
307 * it applies threshold in order of their severity and record the first threshold per message field path
\r
309 * @param messageFieldValues Field Path Values extracted from CEF Message
\r
310 * @param fieldThresholds Policy Thresholds for Field Path
\r
311 * @return Optional of violated threshold for a field path
\r
313 public static Optional<Threshold> thresholdCalculator(final List<BigDecimal> messageFieldValues, final
\r
316 // order thresholds by severity
\r
317 Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);
\r
318 // Now apply each threshold to field values
\r
319 for (Threshold fieldThreshold : fieldThresholds) {
\r
320 for (BigDecimal messageFieldValue : messageFieldValues) {
\r
321 final Boolean isThresholdViolated =
\r
322 fieldThreshold.getDirection().operate(messageFieldValue, new BigDecimal(fieldThreshold
\r
323 .getThresholdValue()));
\r
324 if (isThresholdViolated) {
\r
325 final Threshold violatedThreshold = Threshold.copy(fieldThreshold);
\r
326 violatedThreshold.setActualFieldValue(messageFieldValue);
\r
327 return Optional.of(violatedThreshold);
\r
331 return Optional.absent();
\r
335 * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.
\r
336 * Grabs first highest priority violated threshold
\r
338 * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds
\r
339 * @return First Highest priority violated threshold
\r
341 public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {
\r
343 final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());
\r
345 if (violatedThresholds.size() == 1) {
\r
346 return violatedThresholds.get(0);
\r
348 Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);
\r
349 // Just grab the first violated threshold with highest priority
\r
350 return violatedThresholds.get(0);
\r
355 * Creates {@link MetricsPerEventName} object which contains violated thresholds
\r
357 * @param tcaPolicy TCA Policy
\r
358 * @param violatedThreshold Violated thresholds
\r
359 * @param eventName Event Name
\r
361 * @return MetricsPerEventName object containing one highest severity violated threshold
\r
363 public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,
\r
364 @Nonnull final Threshold violatedThreshold,
\r
365 @Nonnull final String eventName) {
\r
367 final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList(
\r
368 Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() {
\r
370 public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) {
\r
371 return metricsPerEventName.getEventName().equals(eventName);
\r
374 // TCA policy must have only one metrics per event Name
\r
375 if (metricsPerEventNames.size() == 1) {
\r
376 final MetricsPerEventName violatedMetrics =
\r
377 MetricsPerEventName.copy(metricsPerEventNames.get(0));
\r
378 violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));
\r
379 return violatedMetrics;
\r
381 final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName);
\r
382 throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
\r
387 * Computes threshold violations
\r
389 * @param processorContext Filtered processor Context
\r
390 * @return processor context with any threshold violations
\r
392 public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {
\r
393 final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();
\r
394 return policyThresholdsProcessor.apply(processorContext);
\r
399 * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}
\r
402 * @param processorContextWithViolations processor context which has TCA violations
\r
403 * @param tcaAppName tca app name
\r
404 * @param isAlertInCEFFormat determines if output alert is in CEF format
\r
406 * @return TCA Alert String
\r
408 * @throws JsonProcessingException If alert cannot be parsed into JSON String
\r
410 public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,
\r
411 final String tcaAppName,
\r
412 final Boolean isAlertInCEFFormat) throws JsonProcessingException {
\r
413 if (isAlertInCEFFormat != null && isAlertInCEFFormat) {
\r
414 final EventListener eventListenerWithViolations =
\r
415 addThresholdViolationFields(processorContextWithViolations);
\r
416 final String alertString = writeValueAsString(eventListenerWithViolations);
\r
417 LOG.debug("Created alert in CEF Format: {}", alertString);
\r
418 return alertString;
\r
420 final TCAVESResponse newTCAVESResponse =
\r
421 createNewTCAVESResponse(processorContextWithViolations, tcaAppName);
\r
422 final String alertString = writeValueAsString(newTCAVESResponse);
\r
423 LOG.debug("Created alert in Non CEF Format: {}", alertString);
\r
424 return alertString;
\r
429 * Adds threshold violation fields to {@link EventListener}
\r
431 * @param processorContextWithViolations processor context that contains violations
\r
432 * @return event listener with threshold crossing alert fields populated
\r
434 public static EventListener addThresholdViolationFields(
\r
435 final TCACEFProcessorContext processorContextWithViolations) {
\r
437 final MetricsPerEventName metricsPerEventName =
\r
438 processorContextWithViolations.getMetricsPerEventName();
\r
439 // confirm violations are indeed present
\r
440 if (metricsPerEventName == null) {
\r
441 final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";
\r
442 throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
\r
445 // get violated threshold
\r
446 final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
\r
447 final EventListener eventListener = processorContextWithViolations.getCEFEventListener();
\r
448 final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
\r
450 // create new threshold crossing alert fields
\r
451 final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();
\r
452 thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());
\r
453 thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());
\r
454 thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));
\r
455 thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);
\r
456 thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);
\r
457 thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());
\r
458 thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());
\r
459 thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName());
\r
461 // create new performance count
\r
462 final PerformanceCounter performanceCounter = new PerformanceCounter();
\r
463 performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));
\r
464 performanceCounter.setName(violatedThreshold.getFieldPath());
\r
465 performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());
\r
466 performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());
\r
468 // set additional parameters for threshold crossing alert fields
\r
469 thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));
\r
471 // add threshold crossing fields to existing event listener
\r
472 eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);
\r
474 return eventListener;
\r
478 * Converts {@link EventSeverity} to {@link Criticality}
\r
480 * @param eventSeverity event severity
\r
482 * @return performance counter criticality
\r
484 private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {
\r
485 switch (eventSeverity) {
\r
487 return Criticality.CRIT;
\r
489 return Criticality.MAJ;
\r
491 return Criticality.UNKNOWN;
\r
496 * Creates {@link TCAVESResponse} object
\r
498 * @param processorContext processor Context with violations
\r
499 * @param tcaAppName TCA App Name
\r
501 * @return TCA VES Response Message
\r
503 public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,
\r
504 final String tcaAppName) {
\r
506 final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
\r
507 // confirm violations are indeed present
\r
508 if (metricsPerEventName == null) {
\r
509 final String errorMessage = "No violations metrics. Unable to create VES Response";
\r
510 throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
\r
513 final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
\r
514 final EventListener eventListener = processorContext.getCEFEventListener();
\r
515 final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
\r
517 final TCAVESResponse tcavesResponse = new TCAVESResponse();
\r
518 // ClosedLoopControlName included in the DCAE configuration Policy
\r
519 tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());
\r
520 // version included in the DCAE configuration Policy
\r
521 tcavesResponse.setVersion(violatedThreshold.getVersion());
\r
522 // Generate a UUID for this output message
\r
523 tcavesResponse.setRequestID(UUID.randomUUID().toString());
\r
524 // commonEventHeader.startEpochMicrosec from the received VES message
\r
525 tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());
\r
526 // commonEventHeader.lastEpochMicrosec from the received VES message for abated alerts
\r
527 if (violatedThreshold.getClosedLoopEventStatus() == ClosedLoopEventStatus.ABATED) {
\r
528 tcavesResponse.setClosedLoopAlarmEnd(commonEventHeader.getLastEpochMicrosec());
\r
530 // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot
\r
531 tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);
\r
533 final AAI aai = new AAI();
\r
534 tcavesResponse.setAai(aai);
\r
536 // VM specific settings
\r
537 if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) {
\r
538 // Hard Coded - "VM"
\r
539 tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE);
\r
540 // Hard Coded - "vserver.vserver-name"
\r
541 tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET);
\r
542 // commonEventHeader.sourceName from the received VES message
\r
543 aai.setGenericServerName(commonEventHeader.getSourceName());
\r
545 // VNF specific settings
\r
546 // Hard Coded - "VNF"
\r
547 tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE);
\r
548 // Hard Coded - "generic-vnf.vnf-name"
\r
549 tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET);
\r
550 // commonEventHeader.sourceName from the received VES message
\r
551 aai.setGenericVNFName(commonEventHeader.getSourceName());
\r
554 // Hard Coded - "DCAE"
\r
555 tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);
\r
556 // policyScope included in the DCAE configuration Policy
\r
557 tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope());
\r
558 // policyName included in the DCAE configuration Policy
\r
559 tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName());
\r
560 // policyVersion included in the DCAE configuration Policy
\r
561 tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion());
\r
562 // Extracted from violated threshold
\r
563 tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name());
\r
565 return tcavesResponse;
\r
570 * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert
\r
572 * @param tcavesResponse alert
\r
574 * @return control Loop Schema Type
\r
576 public static ControlLoopSchemaType determineControlLoopSchemaType(final TCAVESResponse tcavesResponse) {
\r
577 final AAI aai = tcavesResponse.getAai();
\r
578 if (aai.getGenericServerName() != null) {
\r
579 return ControlLoopSchemaType.VM;
\r
581 return ControlLoopSchemaType.VNF;
\r
586 * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert
\r
588 * @param tcavesResponse {@link TCAVESResponse} TCA alert
\r
590 * @return Source name
\r
592 public static String determineSourceName(final TCAVESResponse tcavesResponse) {
\r
593 final AAI aai = tcavesResponse.getAai();
\r
594 if (aai.getGenericServerName() != null) {
\r
595 return aai.getGenericServerName();
\r
597 return aai.getGenericVNFName();
\r
603 * Extract Domain and Event Name from processor context if present
\r
605 * @param processorContext processor context
\r
606 * @return Tuple of domain and event Name
\r
608 public static Pair<String, String> getDomainAndEventName(
\r
609 @Nullable final TCACEFProcessorContext processorContext) {
\r
611 String domain = null;
\r
612 String eventName = null;
\r
614 if (processorContext != null &&
\r
615 processorContext.getCEFEventListener() != null &&
\r
616 processorContext.getCEFEventListener().getEvent() != null &&
\r
617 processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {
\r
618 final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()
\r
619 .getCommonEventHeader();
\r
621 if (commonEventHeader.getDomain() != null) {
\r
622 domain = commonEventHeader.getDomain().name();
\r
625 if (commonEventHeader.getEventName() != null) {
\r
626 eventName = commonEventHeader.getEventName();
\r
631 return new ImmutablePair<>(domain, eventName);
\r
636 * Creates {@link TCAPolicy} Metrics per Event Name list
\r
638 * @param eventNamesMap Map containing event Name as key and corresponding values
\r
640 * @return List of {@link MetricsPerEventName}
\r
642 public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList(
\r
643 final Map<String, Map<String, String>> eventNamesMap) {
\r
645 // create a new metrics per event Name list
\r
646 final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>();
\r
648 for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) {
\r
650 // create new metrics per event Name instance
\r
651 final MetricsPerEventName newMetricsPerEventName =
\r
652 createNewMetricsPerEventName(eventNamesEntry);
\r
653 metricsPerEventNames.add(newMetricsPerEventName);
\r
655 // determine all threshold related values
\r
656 final Map<String, String> thresholdsValuesMaps =
\r
657 filterMapByKeyNamePrefix(eventNamesEntry.getValue(),
\r
658 AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);
\r
660 // create a map of all threshold values
\r
661 final Map<String, Map<String, String>> thresholdsMap =
\r
662 extractSubTree(thresholdsValuesMaps, 1, 2,
\r
663 AnalyticsConstants.TCA_POLICY_DELIMITER);
\r
665 // add thresholds to nmetrics per event Names threshold list
\r
666 for (Map<String, String> thresholdMap : thresholdsMap.values()) {
\r
667 newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap));
\r
672 return metricsPerEventNames;
\r
676 * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap
\r
678 * @param thresholdMap threshold map with threshold values
\r
680 * @return new instance of TCA Policy Threshold
\r
682 public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {
\r
683 final Threshold threshold = new Threshold();
\r
684 threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));
\r
685 threshold.setVersion(thresholdMap.get("policy.version"));
\r
686 threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));
\r
687 threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));
\r
688 threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));
\r
689 threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));
\r
690 threshold.setClosedLoopEventStatus(
\r
691 ClosedLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus")));
\r
696 * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope
\r
697 * extracted from given eventNamesEntry
\r
699 * @param eventNamesEntry Event Names Entry
\r
701 * @return new instance of MetricsPerEventName
\r
703 public static MetricsPerEventName createNewMetricsPerEventName(
\r
704 final Map.Entry<String, Map<String, String>> eventNamesEntry) {
\r
705 // determine event Name
\r
706 final String eventName = eventNamesEntry.getKey();
\r
707 // determine event Name thresholds
\r
708 final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue();
\r
709 final MetricsPerEventName metricsPerEventName = new MetricsPerEventName();
\r
710 final List<Threshold> thresholds = new LinkedList<>();
\r
711 metricsPerEventName.setThresholds(thresholds);
\r
712 metricsPerEventName.setEventName(eventName);
\r
713 // bind policyName, policyVersion, policyScope and closedLoopControlName
\r
714 metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName"));
\r
715 metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion"));
\r
716 metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope"));
\r
717 metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf(
\r
718 metricsPerEventNameThresholdsMap.get("controlLoopSchemaType")));
\r
719 return metricsPerEventName;
\r
723 * Converts a flattened key/value map which has keys delimited by a given delimiter.
\r
724 * The start Index and end index extract the sub-key value and returns a new map containing
\r
725 * sub-keys and values.
\r
727 * @param actualMap actual Map
\r
728 * @param startIndex start index
\r
729 * @param endIndex end index
\r
730 * @param delimiter delimiter
\r
732 * @return Map with new sub tree map
\r
734 public static Map<String, Map<String, String>> extractSubTree(
\r
735 final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {
\r
737 final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();
\r
739 // iterate over actual map entries
\r
740 for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {
\r
741 final String actualMapKey = actualMapEntry.getKey();
\r
742 final String actualMapValue = actualMapEntry.getValue();
\r
744 // determine delimiter start and end index
\r
745 final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);
\r
746 final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);
\r
747 final int keyLength = actualMapKey.length();
\r
749 // extract sub-tree map
\r
750 if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {
\r
751 final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);
\r
752 final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);
\r
753 final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);
\r
754 if (existingThresholdMap == null) {
\r
755 Map<String, String> newThresholdMap = new LinkedHashMap<>();
\r
756 newThresholdMap.put(subMapKey, actualMapValue);
\r
757 subTreeMap.put(thresholdKey, newThresholdMap);
\r
759 existingThresholdMap.put(subMapKey, actualMapValue);
\r
771 * Provides a view of underlying map that filters out entries with keys starting with give prefix
\r
773 * @param actualMap Target map that needs to be filtered
\r
774 * @param keyNamePrefix key prefix
\r
776 * @return a view of actual map which only show entries which have give prefix
\r
778 public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,
\r
779 final String keyNamePrefix) {
\r
780 return Maps.filterKeys(actualMap,
\r
781 new Predicate<String>() {
\r
783 public boolean apply(@Nullable String key) {
\r
784 return key != null && key.startsWith(keyNamePrefix);
\r
791 * Creates Quartz Scheduler
\r
793 * @param pollingIntervalMS polling interval
\r
794 * @param stdSchedulerFactory Quartz standard schedule factory instance
\r
795 * @param quartzPublisherPropertiesFileName quartz properties file name
\r
796 * @param jobDataMap job Data map
\r
797 * @param quartzJobClass Quartz Job Class
\r
798 * @param quartzJobName Quartz Job Name
\r
799 * @param quartzTriggerName Quartz Trigger name
\r
801 * @param <T> An implementation of Quartz {@link Job} interface
\r
802 * @return Configured Quartz Scheduler
\r
804 * @throws SchedulerException exception if unable to create to Quartz Scheduler
\r
806 public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,
\r
807 final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,
\r
808 final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,
\r
809 final String quartzTriggerName) throws SchedulerException {
\r
811 // Initialize a new Quartz Standard scheduler
\r
812 LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",
\r
813 quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);
\r
814 final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(
\r
815 quartzPublisherPropertiesFileName, new Properties());
\r
816 stdSchedulerFactory.initialize(quartzProperties);
\r
817 final Scheduler scheduler = stdSchedulerFactory.getScheduler();
\r
819 // Create a new job detail
\r
820 final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,
\r
821 AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();
\r
823 // Create a new scheduling builder
\r
824 final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
\r
825 .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule
\r
826 .repeatForever(); // repeats while worker is running
\r
828 // Create a trigger for the TCA Publisher Job
\r
829 final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()
\r
830 .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)
\r
831 .startNow() // job starts right away
\r
832 .withSchedule(simpleScheduleBuilder).build();
\r
834 scheduler.scheduleJob(jobDetail, simpleTrigger);
\r
835 LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());
\r
841 * Does A&AI Enrichment for VM
\r
843 * @param tcavesResponse Outgoing alert object
\r
844 * @param aaiEnrichmentClient A&AI Enrichment client
\r
845 * @param aaiVMEnrichmentAPIPath A&AI VM Enrichment API Path
\r
846 * @param alertString alert String
\r
847 * @param vmSourceName vm source name
\r
849 public static void doAAIVMEnrichment(final TCAVESResponse tcavesResponse,
\r
850 final AAIEnrichmentClient aaiEnrichmentClient,
\r
851 final String aaiVMEnrichmentAPIPath,
\r
852 final String alertString,
\r
853 final String vmSourceName) {
\r
855 final String filterString = "vserver-name:EQUALS:" + vmSourceName;
\r
856 final ImmutableMap<String, String> queryParams = ImmutableMap.of(
\r
857 "search-node-type", "vserver", "filter", filterString);
\r
859 // fetch vm object resource Link from A&AI
\r
860 final String vmAAIResourceLinkDetails = aaiEnrichmentClient.getEnrichmentDetails(
\r
861 aaiVMEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());
\r
862 final String vmObjectResourceLink = getVMObjectResourceLink(vmAAIResourceLinkDetails);
\r
864 if (vmObjectResourceLink == null) {
\r
865 LOG.warn("No A&AI Enrichment possible for alert message: {}.VM Object resource Link cannot be " +
\r
866 "determined for vmSourceName: {}.", alertString, vmSourceName);
\r
869 LOG.debug("Fetching VM A&AI Enrichment Details for VM Source Name: {}, Object resource Link: {}",
\r
870 vmSourceName, vmObjectResourceLink);
\r
872 // fetch vm A&AI Enrichment
\r
873 final String vmEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(
\r
874 vmObjectResourceLink, Collections.<String, String>emptyMap(), createAAIEnrichmentHeaders());
\r
877 enrichAAI(tcavesResponse.getAai(), vmEnrichmentDetails, alertString,
\r
878 AnalyticsConstants.AAI_VSERVER_KEY_PREFIX);
\r
886 * Does A&AI Enrichment for VNF
\r
888 * @param tcavesResponse Outgoing alert object
\r
889 * @param aaiEnrichmentClient A&AI Enrichment client
\r
890 * @param aaiVNFEnrichmentAPIPath A&AI VNF Enrichment API Path
\r
891 * @param alertString alert String
\r
892 * @param vnfSourceName vnf source name
\r
894 public static void doAAIVNFEnrichment(final TCAVESResponse tcavesResponse,
\r
895 final AAIEnrichmentClient aaiEnrichmentClient,
\r
896 final String aaiVNFEnrichmentAPIPath,
\r
897 final String alertString,
\r
898 final String vnfSourceName) {
\r
899 final ImmutableMap<String, String> queryParams = ImmutableMap.of("vnf-name", vnfSourceName);
\r
901 // fetch vnf A&AI Enrichment
\r
902 final String vnfEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(
\r
903 aaiVNFEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());
\r
905 // enrich alert AAI
\r
906 enrichAAI(tcavesResponse.getAai(), vnfEnrichmentDetails, alertString, AnalyticsConstants.AAI_VNF_KEY_PREFIX);
\r
910 * Fetches VM Object Resource Link from A&AI Resource Link Json
\r
912 * @param vmAAIResourceLinkDetails VM Object Resource Link from A&AI Resource Link Json
\r
914 * @return object resource link String
\r
916 private static String getVMObjectResourceLink(final String vmAAIResourceLinkDetails) {
\r
917 if (StringUtils.isNotBlank(vmAAIResourceLinkDetails)) {
\r
919 final JsonNode jsonNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(vmAAIResourceLinkDetails);
\r
920 final JsonNode resourceLinkJsonNode = jsonNode.findPath("resource-link");
\r
921 if (!resourceLinkJsonNode.isMissingNode()) {
\r
922 return resourceLinkJsonNode.asText();
\r
924 } catch (IOException e) {
\r
925 LOG.warn("Unable to determine VM Object link inside AAI Resource Link Response JSON: {}. Exception: {}",
\r
926 vmAAIResourceLinkDetails, e);
\r
933 * Creates Http Headers for A&AI Enrichment client
\r
935 * @return Http Headers Map for A&AI Enrichment client
\r
937 private static Map<String, String> createAAIEnrichmentHeaders() {
\r
938 final Map<String, String> aaiEnrichmentHeaders = new LinkedHashMap<>();
\r
939 final String transactionId = Long.toString(new Date().getTime());
\r
940 aaiEnrichmentHeaders.put("X-FromAppId", "dcae-analytics-tca");
\r
941 aaiEnrichmentHeaders.put("X-TransactionId", transactionId);
\r
942 aaiEnrichmentHeaders.put("Accept", "application/json");
\r
943 aaiEnrichmentHeaders.put("Real-Time", "true");
\r
944 aaiEnrichmentHeaders.put("Content-Type", "application/json");
\r
945 return aaiEnrichmentHeaders;
\r
950 * Populates A&AI details retrieved from A&AI Enrichment API into Alerts A&AI Object
\r
952 * @param preEnrichmentAAI A&AI Alert object which needs to be populated with A&AI Enrichment Details
\r
953 * @param aaiEnrichmentDetails A&AI Enrichment API fetched JSON String
\r
954 * @param alertString Alert String
\r
955 * @param keyPrefix Key prefix that needs to be added to each fetched A&AI Enrichment record
\r
957 private static void enrichAAI(final AAI preEnrichmentAAI, final String aaiEnrichmentDetails,
\r
958 final String alertString, final String keyPrefix) {
\r
960 if (aaiEnrichmentDetails == null) {
\r
961 LOG.warn("No A&AI Enrichment possible for AAI: {}. A&AI Enrichment details are absent." +
\r
962 "Skipping Enrichment for alert message:{}", preEnrichmentAAI, alertString);
\r
966 final AAI enrichmentDetailsAAI = getEnrichmentDetailsAAI(aaiEnrichmentDetails);
\r
968 if (enrichmentDetailsAAI != null) {
\r
969 final Set<Map.Entry<String, Object>> enrichedAAIEntrySet =
\r
970 enrichmentDetailsAAI.getDynamicProperties().entrySet();
\r
971 final Map<String, Object> preEnrichmentAAIDynamicProperties = preEnrichmentAAI.getDynamicProperties();
\r
973 // populate A&AI Enrichment details and add prefix to key
\r
974 for (Map.Entry<String, Object> enrichedAAIEntry : enrichedAAIEntrySet) {
\r
975 preEnrichmentAAIDynamicProperties.put(keyPrefix + enrichedAAIEntry.getKey(),
\r
976 enrichedAAIEntry.getValue());
\r
979 LOG.debug("A&AI Enrichment was completed successfully for alert message: {}. Enriched AAI: {}",
\r
980 alertString, preEnrichmentAAI);
\r
982 LOG.warn("No A&AI Enrichment possible for AAI: {}. Invalid A&AI Response: {}." +
\r
983 "Skipping Enrichment for alert message: {}",
\r
984 preEnrichmentAAI, aaiEnrichmentDetails, alertString);
\r
991 * Creates a new A&AI object with only top level A&AI Enrichment details
\r
993 * @param aaiEnrichmentDetails A&AI Enrichment details
\r
995 * @return new A&AI with only top level A&AI Enrichment details
\r
997 private static AAI getEnrichmentDetailsAAI(final String aaiEnrichmentDetails) {
\r
999 final JsonNode rootNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(aaiEnrichmentDetails);
\r
1000 final Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();
\r
1001 while (fieldsIterator.hasNext()) {
\r
1002 final Map.Entry<String, JsonNode> fieldEntry = fieldsIterator.next();
\r
1003 final JsonNode jsonNode = fieldEntry.getValue();
\r
1004 // remove all arrays, objects from A&AI Enrichment Json
\r
1005 if (jsonNode.isPojo() || jsonNode.isObject() || jsonNode.isArray()) {
\r
1006 fieldsIterator.remove();
\r
1009 return ANALYTICS_MODEL_OBJECT_MAPPER.treeToValue(rootNode, AAI.class);
\r
1010 } catch (IOException e) {
\r
1011 LOG.error("Failed to Parse AAI Enrichment Details from JSON: {}, Exception: {}.", aaiEnrichmentDetails, e);
\r