TCA: Replace any openecomp reference by onap
[dcaegen2/analytics/tca.git] / dcae-analytics-tca / src / main / java / org / onap / dcae / apod / analytics / tca / utils / TCAUtils.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.onap.dcae.apod.analytics.tca.utils;
22
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.fasterxml.jackson.databind.JsonNode;
25 import com.google.common.base.Function;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Predicate;
28 import com.google.common.base.Predicates;
29 import com.google.common.base.Supplier;
30 import com.google.common.base.Suppliers;
31 import com.google.common.collect.HashBasedTable;
32 import com.google.common.collect.ImmutableList;
33 import com.google.common.collect.ImmutableMap;
34 import com.google.common.collect.Iterables;
35 import com.google.common.collect.Lists;
36 import com.google.common.collect.Maps;
37 import com.google.common.collect.Table;
38 import com.jayway.jsonpath.DocumentContext;
39 import com.jayway.jsonpath.JsonPath;
40 import com.jayway.jsonpath.TypeRef;
41 import org.apache.commons.lang3.StringUtils;
42 import org.apache.commons.lang3.tuple.ImmutablePair;
43 import org.apache.commons.lang3.tuple.Pair;
44 import org.onap.dcae.apod.analytics.aai.service.AAIEnrichmentClient;
45 import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
46 import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
47 import org.onap.dcae.apod.analytics.common.exception.MessageProcessingException;
48 import org.onap.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
49 import org.onap.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;
50 import org.onap.dcae.apod.analytics.model.domain.cef.AlertAction;
51 import org.onap.dcae.apod.analytics.model.domain.cef.AlertType;
52 import org.onap.dcae.apod.analytics.model.domain.cef.CommonEventHeader;
53 import org.onap.dcae.apod.analytics.model.domain.cef.Criticality;
54 import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
55 import org.onap.dcae.apod.analytics.model.domain.cef.EventSeverity;
56 import org.onap.dcae.apod.analytics.model.domain.cef.PerformanceCounter;
57 import org.onap.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;
58 import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
59 import org.onap.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;
60 import org.onap.dcae.apod.analytics.model.domain.policy.tca.Direction;
61 import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
62 import org.onap.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
63 import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
64 import org.onap.dcae.apod.analytics.model.facade.tca.AAI;
65 import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
66 import org.onap.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;
67 import org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
68 import org.onap.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
69 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
70 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
71 import org.onap.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
72 import org.onap.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
73 import org.quartz.Job;
74 import org.quartz.JobBuilder;
75 import org.quartz.JobDataMap;
76 import org.quartz.JobDetail;
77 import org.quartz.Scheduler;
78 import org.quartz.SchedulerException;
79 import org.quartz.SimpleScheduleBuilder;
80 import org.quartz.SimpleTrigger;
81 import org.quartz.TriggerBuilder;
82 import org.quartz.impl.StdSchedulerFactory;
83 import org.slf4j.Logger;
84 import org.slf4j.LoggerFactory;
85
86 import java.io.IOException;
87 import java.math.BigDecimal;
88 import java.util.ArrayList;
89 import java.util.Collections;
90 import java.util.Comparator;
91 import java.util.Date;
92 import java.util.HashMap;
93 import java.util.Iterator;
94 import java.util.LinkedHashMap;
95 import java.util.LinkedList;
96 import java.util.List;
97 import java.util.Map;
98 import java.util.Properties;
99 import java.util.Set;
100 import java.util.SortedMap;
101 import java.util.TreeMap;
102 import java.util.UUID;
103
104 import javax.annotation.Nonnull;
105 import javax.annotation.Nullable;
106
107 import static com.google.common.collect.Lists.newArrayList;
108 import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;
109
110 /**
111  * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get
112  * pre configured Json Object Mapper understand serialization and deserialization of CEF Message
113  * and TCA Policy
114  *
115  * @author Rajiv Singla . Creation Date: 10/24/2016.
116  */
117 public abstract class TCAUtils extends AnalyticsModelJsonUtils {
118
119     private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);
120
121     /**
122      * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,
123      * WARNING )
124      */
125     private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {
126         @Override
127         public int compare(Threshold threshold1, Threshold threshold2) {
128             return threshold1.getSeverity().compareTo(threshold2.getSeverity());
129         }
130     };
131
132     /**
133      * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy}
134      *
135      * @return TCA Policy Metrics Per Event Name list
136      */
137     public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() {
138         return new Function<TCAPolicy, List<MetricsPerEventName>>() {
139             @Nullable
140             @Override
141             public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) {
142                 return tcaPolicy.getMetricsPerEventName();
143             }
144         };
145     }
146
147     /**
148      * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from
149      * {@link MetricsPerEventName}
150      *
151      * @return Event Names or a Metrics Per Event Name object
152      */
153     public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() {
154         return new Function<MetricsPerEventName, String>() {
155             @Override
156             public String apply(@Nonnull MetricsPerEventName metricsPerEventName) {
157                 return metricsPerEventName.getEventName();
158             }
159         };
160     }
161
162
163     /**
164      * Extracts {@link TCAPolicy} Event Names
165      *
166      * @param tcaPolicy TCA Policy
167      * @return List of event names in the TCA Policy
168      */
169     public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) {
170         final List<MetricsPerEventName> metricsPerEventNames =
171                 tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);
172
173         return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction());
174     }
175
176     /**
177      * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to
178      * change during runtime
179      *
180      * @param tcaPolicy TCA Policy
181      * @return a Supplier that memoize the TCA Policy event names
182      */
183     public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) {
184         return Suppliers.memoize(new Supplier<List<String>>() {
185             @Override
186             public List<String> get() {
187                 return getPolicyEventNames(tcaPolicy);
188             }
189         });
190     }
191
192
193     /**
194      * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path
195      *
196      * @param tcaPolicy TCA Policy
197      * @return A table with Keys of event name and field path containing List of threshold as values
198      */
199     public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) {
200         final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();
201         for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) {
202             final String eventName = metricsPerEventName.getEventName();
203             final List<Threshold> thresholds = metricsPerEventName.getThresholds();
204             for (Threshold threshold : thresholds) {
205                 final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath());
206                 if (existingThresholds == null) {
207                     final LinkedList<Threshold> newThresholdList = new LinkedList<>();
208                     newThresholdList.add(threshold);
209                     domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList);
210                 } else {
211                     domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold);
212                 }
213             }
214         }
215         return domainFRTable;
216     }
217
218
219     /**
220      * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table
221      *
222      * @param tcaPolicy TCA Policy
223      * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values
224      */
225     public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier
226     (final TCAPolicy tcaPolicy) {
227         return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {
228             @Override
229             public Table<String, String, List<Threshold>> get() {
230                 return getPolicyEventNameThresholdsTable(tcaPolicy);
231             }
232         });
233     }
234
235
236     /**
237      * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},
238      * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to
239      * filter out messages which does not match policy domain or event Name
240      *
241      * @param cefMessage CEF Message
242      * @param tcaPolicy TCA Policy
243      * @return Message Process Context after processing filter chain
244      */
245     public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,
246                                                           @Nonnull final TCAPolicy tcaPolicy) {
247
248         final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();
249         final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();
250         final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter();
251         // Create a list of message processors
252         final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =
253                 ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter);
254         final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);
255         // Create a message processors chain
256         final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =
257                 new GenericMessageChainProcessor<>(messageProcessors, processorContext);
258         // process chain
259         return tcaProcessingChain.processChain();
260     }
261
262
263     /**
264      * Extracts json path values for given json Field Paths from using Json path notation. Assumes
265      * that values extracted are always long
266      *
267      * @param message CEF Message
268      * @param jsonFieldPaths Json Field Paths
269      * @return Map containing key as json path and values as values associated with that json path
270      */
271     public static Map<String, List<BigDecimal>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>
272             jsonFieldPaths) {
273
274         final Map<String, List<BigDecimal>> jsonFieldPathMap = new HashMap<>();
275         final DocumentContext documentContext = JsonPath.parse(message);
276
277         for (String jsonFieldPath : jsonFieldPaths) {
278             List<BigDecimal> jsonFieldValues = null;
279
280             try {
281                 jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<BigDecimal>>() {
282                 });
283             } catch (Exception e) {
284                 final String errorMessage = String.format(
285                         "Unable to convert jsonFieldPath: %s value to valid number. " +
286                                 "Json Path value is not in a valid number format. Incoming message: %s",
287                         jsonFieldPath, message);
288                 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
289             }
290             // If Json Field Values are not or empty
291             if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
292                 // Filter out all null values in the filed values list
293                 final List<BigDecimal> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,
294                         Predicates.<BigDecimal>notNull()));
295                 // If there are non null values put them in the map
296                 if (!nonNullValues.isEmpty()) {
297                     jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
298                 }
299             }
300         }
301
302         return jsonFieldPathMap;
303     }
304
305     /**
306      * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
307      * it applies threshold in order of their severity and record the first threshold per message field path
308      *
309      * @param messageFieldValues Field Path Values extracted from CEF Message
310      * @param fieldThresholds Policy Thresholds for Field Path
311      * @return Optional of violated threshold for a field path
312      */
313     public static Optional<Threshold> thresholdCalculator(final List<BigDecimal> messageFieldValues, final
314     List<Threshold>
315             fieldThresholds) {
316         // order thresholds by severity
317         Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);
318         // Now apply each threshold to field values
319         for (Threshold fieldThreshold : fieldThresholds) {
320             for (BigDecimal messageFieldValue : messageFieldValues) {
321                 final Boolean isThresholdViolated =
322                         fieldThreshold.getDirection().operate(messageFieldValue, new BigDecimal(fieldThreshold
323                                 .getThresholdValue()));
324                 if (isThresholdViolated) {
325                     final Threshold violatedThreshold = Threshold.copy(fieldThreshold);
326                     violatedThreshold.setActualFieldValue(messageFieldValue);
327                     return Optional.of(violatedThreshold);
328                 }
329             }
330         }
331         return Optional.absent();
332     }
333
334     /**
335      * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.
336      * Grabs first highest priority violated threshold
337      *
338      * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds
339      * @return First Highest priority violated threshold
340      */
341     public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {
342
343         final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());
344
345         if (violatedThresholds.size() == 1) {
346             return violatedThresholds.get(0);
347         }
348         Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);
349         // Just grab the first violated threshold with highest priority
350         return violatedThresholds.get(0);
351     }
352
353
354     /**
355      * Creates {@link MetricsPerEventName} object which contains violated thresholds
356      *
357      * @param tcaPolicy TCA Policy
358      * @param violatedThreshold Violated thresholds
359      * @param eventName Event Name
360      *
361      * @return MetricsPerEventName object containing one highest severity violated threshold
362      */
363     public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,
364                                                             @Nonnull final Threshold violatedThreshold,
365                                                             @Nonnull final String eventName) {
366
367         final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList(
368                 Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() {
369                     @Override
370                     public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) {
371                         return metricsPerEventName.getEventName().equals(eventName);
372                     }
373                 }));
374         // TCA policy must have only one metrics per event Name
375         if (metricsPerEventNames.size() == 1) {
376             final MetricsPerEventName violatedMetrics =
377                     MetricsPerEventName.copy(metricsPerEventNames.get(0));
378             violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));
379             return violatedMetrics;
380         } else {
381             final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName);
382             throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
383         }
384     }
385
386     /**
387      * Computes threshold violations
388      *
389      * @param processorContext Filtered processor Context
390      * @return processor context with any threshold violations
391      */
392     public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {
393         final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();
394         return policyThresholdsProcessor.apply(processorContext);
395     }
396
397
398     /**
399      * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}
400      * formats
401      *
402      * @param processorContextWithViolations processor context which has TCA violations
403      * @param tcaAppName tca app name
404      * @param isAlertInCEFFormat determines if output alert is in CEF format
405      *
406      * @return TCA Alert String
407      *
408      * @throws JsonProcessingException If alert cannot be parsed into JSON String
409      */
410     public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,
411                                               final String tcaAppName,
412                                               final Boolean isAlertInCEFFormat) throws JsonProcessingException {
413         if (isAlertInCEFFormat != null && isAlertInCEFFormat) {
414             final EventListener eventListenerWithViolations =
415                     addThresholdViolationFields(processorContextWithViolations);
416             final String alertString = writeValueAsString(eventListenerWithViolations);
417             LOG.debug("Created alert in CEF Format: {}", alertString);
418             return alertString;
419         } else {
420             final TCAVESResponse newTCAVESResponse =
421                     createNewTCAVESResponse(processorContextWithViolations, tcaAppName);
422             final String alertString = writeValueAsString(newTCAVESResponse);
423             LOG.debug("Created alert in Non CEF Format: {}", alertString);
424             return alertString;
425         }
426     }
427
428     /**
429      * Adds threshold violation fields to {@link EventListener}
430      *
431      * @param processorContextWithViolations processor context that contains violations
432      * @return event listener with threshold crossing alert fields populated
433      */
434     public static EventListener addThresholdViolationFields(
435             final TCACEFProcessorContext processorContextWithViolations) {
436
437         final MetricsPerEventName metricsPerEventName =
438                 processorContextWithViolations.getMetricsPerEventName();
439         // confirm violations are indeed present
440         if (metricsPerEventName == null) {
441             final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";
442             throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
443         }
444
445         // get violated threshold
446         final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
447         final EventListener eventListener = processorContextWithViolations.getCEFEventListener();
448         final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
449
450         // create new threshold crossing alert fields
451         final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();
452         thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());
453         thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());
454         thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));
455         thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);
456         thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);
457         thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());
458         thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());
459         thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName());
460
461         // create new performance count
462         final PerformanceCounter performanceCounter = new PerformanceCounter();
463         performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));
464         performanceCounter.setName(violatedThreshold.getFieldPath());
465         performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());
466         performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());
467
468         // set additional parameters for threshold crossing alert fields
469         thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));
470
471         // add threshold crossing fields to existing event listener
472         eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);
473
474         return eventListener;
475     }
476
477     /**
478      * Converts {@link EventSeverity} to {@link Criticality}
479      *
480      * @param eventSeverity event severity
481      *
482      * @return performance counter criticality
483      */
484     private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {
485         switch (eventSeverity) {
486             case CRITICAL:
487                 return Criticality.CRIT;
488             case MAJOR:
489                 return Criticality.MAJ;
490             default:
491                 return Criticality.UNKNOWN;
492         }
493     }
494
495     /**
496      * Creates {@link TCAVESResponse} object
497      *
498      * @param processorContext processor Context with violations
499      * @param tcaAppName TCA App Name
500      *
501      * @return TCA VES Response Message
502      */
503     public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,
504                                                          final String tcaAppName) {
505
506         final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
507         // confirm violations are indeed present
508         if (metricsPerEventName == null) {
509             final String errorMessage = "No violations metrics. Unable to create VES Response";
510             throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
511         }
512
513         final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
514         final EventListener eventListener = processorContext.getCEFEventListener();
515         final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
516
517         final TCAVESResponse tcavesResponse = new TCAVESResponse();
518         // ClosedLoopControlName included in the DCAE configuration Policy
519         tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());
520         // version included in the DCAE configuration Policy
521         tcavesResponse.setVersion(violatedThreshold.getVersion());
522         // Generate a UUID for this output message
523         tcavesResponse.setRequestID(UUID.randomUUID().toString());
524         // commonEventHeader.startEpochMicrosec from the received VES message
525         tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());
526         // commonEventHeader.lastEpochMicrosec from the received VES message for abated alerts
527         if (violatedThreshold.getClosedLoopEventStatus() == ClosedLoopEventStatus.ABATED) {
528             tcavesResponse.setClosedLoopAlarmEnd(commonEventHeader.getLastEpochMicrosec());
529         }
530         // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot
531         tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);
532
533         final AAI aai = new AAI();
534         tcavesResponse.setAai(aai);
535
536         // VM specific settings
537         if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) {
538             // Hard Coded - "VM"
539             tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE);
540             // Hard Coded - "vserver.vserver-name"
541             tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET);
542             // commonEventHeader.sourceName from the received VES message
543             aai.setGenericServerName(commonEventHeader.getSourceName());
544         } else {
545             // VNF specific settings
546             // Hard Coded - "VNF"
547             tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE);
548             // Hard Coded - "generic-vnf.vnf-name"
549             tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET);
550             // commonEventHeader.sourceName from the received VES message
551             aai.setGenericVNFName(commonEventHeader.getSourceName());
552         }
553
554         // Hard Coded - "DCAE"
555         tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);
556         // policyScope included in the DCAE configuration Policy
557         tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope());
558         // policyName included in the DCAE configuration Policy
559         tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName());
560         // policyVersion included in the DCAE configuration Policy
561         tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion());
562         // Extracted from violated threshold
563         tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name());
564
565         return tcavesResponse;
566     }
567
568
569     /**
570      * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert
571      *
572      * @param tcavesResponse alert
573      *
574      * @return control Loop Schema Type
575      */
576     public static ControlLoopSchemaType determineControlLoopSchemaType(final TCAVESResponse tcavesResponse) {
577         final AAI aai = tcavesResponse.getAai();
578         if (aai.getGenericServerName() != null) {
579             return ControlLoopSchemaType.VM;
580         } else {
581             return ControlLoopSchemaType.VNF;
582         }
583     }
584
585     /**
586      * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert
587      *
588      * @param tcavesResponse {@link TCAVESResponse} TCA alert
589      *
590      * @return Source name
591      */
592     public static String determineSourceName(final TCAVESResponse tcavesResponse) {
593         final AAI aai = tcavesResponse.getAai();
594         if (aai.getGenericServerName() != null) {
595             return aai.getGenericServerName();
596         } else {
597             return aai.getGenericVNFName();
598         }
599     }
600
601
602     /**
603      * Extract Domain and Event Name from processor context if present
604      *
605      * @param processorContext processor context
606      * @return Tuple of domain and event Name
607      */
608     public static Pair<String, String> getDomainAndEventName(
609             @Nullable final TCACEFProcessorContext processorContext) {
610
611         String domain = null;
612         String eventName = null;
613
614         if (processorContext != null &&
615                 processorContext.getCEFEventListener() != null &&
616                 processorContext.getCEFEventListener().getEvent() != null &&
617                 processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {
618             final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()
619                     .getCommonEventHeader();
620
621             if (commonEventHeader.getDomain() != null) {
622                 domain = commonEventHeader.getDomain().name();
623             }
624
625             if (commonEventHeader.getEventName() != null) {
626                 eventName = commonEventHeader.getEventName();
627             }
628
629         }
630
631         return new ImmutablePair<>(domain, eventName);
632
633     }
634
635     /**
636      * Creates {@link TCAPolicy} Metrics per Event Name list
637      *
638      * @param eventNamesMap Map containing event Name as key and corresponding values
639      *
640      * @return List of {@link MetricsPerEventName}
641      */
642     public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList(
643             final Map<String, Map<String, String>> eventNamesMap) {
644
645         // create a new metrics per event Name list
646         final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>();
647
648         for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) {
649
650             // create new metrics per event Name instance
651             final MetricsPerEventName newMetricsPerEventName =
652                     createNewMetricsPerEventName(eventNamesEntry);
653             metricsPerEventNames.add(newMetricsPerEventName);
654
655             // determine all threshold related values
656             final Map<String, String> thresholdsValuesMaps =
657                     filterMapByKeyNamePrefix(eventNamesEntry.getValue(),
658                             AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);
659
660             // create a map of all threshold values
661             final Map<String, Map<String, String>> thresholdsMap =
662                     extractSubTree(thresholdsValuesMaps, 1, 2,
663                             AnalyticsConstants.TCA_POLICY_DELIMITER);
664
665             // add thresholds to nmetrics per event Names threshold list
666             for (Map<String, String> thresholdMap : thresholdsMap.values()) {
667                 newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap));
668             }
669
670         }
671
672         return metricsPerEventNames;
673     }
674
675     /**
676      * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap
677      *
678      * @param thresholdMap threshold map with threshold values
679      *
680      * @return new instance of TCA Policy Threshold
681      */
682     public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {
683         final Threshold threshold = new Threshold();
684         threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));
685         threshold.setVersion(thresholdMap.get("policy.version"));
686         threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));
687         threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));
688         threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));
689         threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));
690         threshold.setClosedLoopEventStatus(
691                 ClosedLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus")));
692         return threshold;
693     }
694
695     /**
696      * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope
697      * extracted from given eventNamesEntry
698      *
699      * @param eventNamesEntry Event Names Entry
700      *
701      * @return new instance of MetricsPerEventName
702      */
703     public static MetricsPerEventName createNewMetricsPerEventName(
704             final Map.Entry<String, Map<String, String>> eventNamesEntry) {
705         // determine event Name
706         final String eventName = eventNamesEntry.getKey();
707         // determine event Name thresholds
708         final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue();
709         final MetricsPerEventName metricsPerEventName = new MetricsPerEventName();
710         final List<Threshold> thresholds = new LinkedList<>();
711         metricsPerEventName.setThresholds(thresholds);
712         metricsPerEventName.setEventName(eventName);
713         // bind policyName, policyVersion, policyScope and closedLoopControlName
714         metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName"));
715         metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion"));
716         metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope"));
717         metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf(
718                 metricsPerEventNameThresholdsMap.get("controlLoopSchemaType")));
719         return metricsPerEventName;
720     }
721
722     /**
723      * Converts a flattened key/value map which has keys delimited by a given delimiter.
724      * The start Index and end index extract the sub-key value and returns a new map containing
725      * sub-keys and values.
726      *
727      * @param actualMap actual Map
728      * @param startIndex start index
729      * @param endIndex end index
730      * @param delimiter delimiter
731      *
732      * @return Map with new sub tree map
733      */
734     public static Map<String, Map<String, String>> extractSubTree(
735             final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {
736
737         final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();
738
739         // iterate over actual map entries
740         for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {
741             final String actualMapKey = actualMapEntry.getKey();
742             final String actualMapValue = actualMapEntry.getValue();
743
744             // determine delimiter start and end index
745             final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);
746             final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);
747             final int keyLength = actualMapKey.length();
748
749             // extract sub-tree map
750             if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {
751                 final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);
752                 final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);
753                 final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);
754                 if (existingThresholdMap == null) {
755                     Map<String, String> newThresholdMap = new LinkedHashMap<>();
756                     newThresholdMap.put(subMapKey, actualMapValue);
757                     subTreeMap.put(thresholdKey, newThresholdMap);
758                 } else {
759                     existingThresholdMap.put(subMapKey, actualMapValue);
760                 }
761
762             }
763         }
764
765         return subTreeMap;
766
767     }
768
769
770     /**
771      * Provides a view of underlying map that filters out entries with keys starting with give prefix
772      *
773      * @param actualMap Target map that needs to be filtered
774      * @param keyNamePrefix key prefix
775      *
776      * @return a view of actual map which only show entries which have give prefix
777      */
778     public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,
779                                                                final String keyNamePrefix) {
780         return Maps.filterKeys(actualMap,
781                 new Predicate<String>() {
782                     @Override
783                     public boolean apply(@Nullable String key) {
784                         return key != null && key.startsWith(keyNamePrefix);
785                     }
786                 });
787     }
788
789
790     /**
791      * Creates Quartz Scheduler
792      *
793      * @param pollingIntervalMS polling interval
794      * @param stdSchedulerFactory Quartz standard schedule factory instance
795      * @param quartzPublisherPropertiesFileName quartz properties file name
796      * @param jobDataMap job Data map
797      * @param quartzJobClass Quartz Job Class
798      * @param quartzJobName Quartz Job Name
799      * @param quartzTriggerName Quartz Trigger name
800      *
801      * @param <T> An implementation of Quartz {@link Job} interface
802      * @return Configured Quartz Scheduler
803      *
804      * @throws SchedulerException exception if unable to create to Quartz Scheduler
805      */
806     public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,
807             final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,
808             final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,
809             final String quartzTriggerName) throws SchedulerException {
810
811         // Initialize a new Quartz Standard scheduler
812         LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",
813                 quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);
814         final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(
815                 quartzPublisherPropertiesFileName, new Properties());
816         stdSchedulerFactory.initialize(quartzProperties);
817         final Scheduler scheduler = stdSchedulerFactory.getScheduler();
818
819         // Create a new job detail
820         final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,
821                 AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();
822
823         // Create a new scheduling builder
824         final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
825                 .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule
826                 .repeatForever(); // repeats while worker is running
827
828         // Create a trigger for the TCA Publisher Job
829         final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()
830                 .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)
831                 .startNow() // job starts right away
832                 .withSchedule(simpleScheduleBuilder).build();
833
834         scheduler.scheduleJob(jobDetail, simpleTrigger);
835         LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());
836         return scheduler;
837     }
838
839
840     /**
841      * Does A&AI Enrichment for VM
842      *
843      * @param tcavesResponse Outgoing alert object
844      * @param aaiEnrichmentClient A&AI Enrichment client
845      * @param aaiVMEnrichmentAPIPath A&AI VM Enrichment API Path
846      * @param alertString alert String
847      * @param vmSourceName vm source name
848      */
849     public static void doAAIVMEnrichment(final TCAVESResponse tcavesResponse,
850                                          final AAIEnrichmentClient aaiEnrichmentClient,
851                                          final String aaiVMEnrichmentAPIPath,
852                                          final String alertString,
853                                          final String vmSourceName) {
854
855         final String filterString = "vserver-name:EQUALS:" + vmSourceName;
856         final ImmutableMap<String, String> queryParams = ImmutableMap.of(
857                 "search-node-type", "vserver", "filter", filterString);
858
859         // fetch vm object resource Link from A&AI
860         final String vmAAIResourceLinkDetails = aaiEnrichmentClient.getEnrichmentDetails(
861                 aaiVMEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());
862         final String vmObjectResourceLink = getVMObjectResourceLink(vmAAIResourceLinkDetails);
863
864         if (vmObjectResourceLink == null) {
865             LOG.warn("No A&AI Enrichment possible for alert message: {}.VM Object resource Link cannot be " +
866                     "determined for vmSourceName: {}.", alertString, vmSourceName);
867         } else {
868
869             LOG.debug("Fetching VM A&AI Enrichment Details for VM Source Name: {}, Object resource Link: {}",
870                     vmSourceName, vmObjectResourceLink);
871
872             // fetch vm A&AI Enrichment
873             final String vmEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(
874                     vmObjectResourceLink, Collections.<String, String>emptyMap(), createAAIEnrichmentHeaders());
875
876             // enrich AAI
877             enrichAAI(tcavesResponse.getAai(), vmEnrichmentDetails, alertString,
878                     AnalyticsConstants.AAI_VSERVER_KEY_PREFIX);
879         }
880
881
882     }
883
884
885     /**
886      * Does A&AI Enrichment for VNF
887      *
888      * @param tcavesResponse Outgoing alert object
889      * @param aaiEnrichmentClient A&AI Enrichment client
890      * @param aaiVNFEnrichmentAPIPath A&AI VNF Enrichment API Path
891      * @param alertString alert String
892      * @param vnfSourceName vnf source name
893      */
894     public static void doAAIVNFEnrichment(final TCAVESResponse tcavesResponse,
895                                           final AAIEnrichmentClient aaiEnrichmentClient,
896                                           final String aaiVNFEnrichmentAPIPath,
897                                           final String alertString,
898                                           final String vnfSourceName) {
899         final ImmutableMap<String, String> queryParams = ImmutableMap.of("vnf-name", vnfSourceName);
900
901         // fetch vnf A&AI Enrichment
902         final String vnfEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(
903                 aaiVNFEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());
904
905         // enrich alert AAI
906         enrichAAI(tcavesResponse.getAai(), vnfEnrichmentDetails, alertString, AnalyticsConstants.AAI_VNF_KEY_PREFIX);
907     }
908
909     /**
910      * Fetches VM Object Resource Link from A&AI Resource Link Json
911      *
912      * @param vmAAIResourceLinkDetails VM Object Resource Link from A&AI Resource Link Json
913      *
914      * @return object resource link String
915      */
916     private static String getVMObjectResourceLink(final String vmAAIResourceLinkDetails) {
917         if (StringUtils.isNotBlank(vmAAIResourceLinkDetails)) {
918             try {
919                 final JsonNode jsonNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(vmAAIResourceLinkDetails);
920                 final JsonNode resourceLinkJsonNode = jsonNode.findPath("resource-link");
921                 if (!resourceLinkJsonNode.isMissingNode()) {
922                     return resourceLinkJsonNode.asText();
923                 }
924             } catch (IOException e) {
925                 LOG.warn("Unable to determine VM Object link inside AAI Resource Link Response JSON: {}. Exception: {}",
926                         vmAAIResourceLinkDetails, e);
927             }
928         }
929         return null;
930     }
931
932     /**
933      * Creates Http Headers for A&AI Enrichment client
934      *
935      * @return Http Headers Map for A&AI Enrichment client
936      */
937     private static Map<String, String> createAAIEnrichmentHeaders() {
938         final Map<String, String> aaiEnrichmentHeaders = new LinkedHashMap<>();
939         final String transactionId = Long.toString(new Date().getTime());
940         aaiEnrichmentHeaders.put("X-FromAppId", "dcae-analytics-tca");
941         aaiEnrichmentHeaders.put("X-TransactionId", transactionId);
942         aaiEnrichmentHeaders.put("Accept", "application/json");
943         aaiEnrichmentHeaders.put("Real-Time", "true");
944         aaiEnrichmentHeaders.put("Content-Type", "application/json");
945         return aaiEnrichmentHeaders;
946     }
947
948
949     /**
950      * Populates A&AI details retrieved from A&AI Enrichment API into Alerts A&AI Object
951      *
952      * @param preEnrichmentAAI A&AI Alert object which needs to be populated with A&AI Enrichment Details
953      * @param aaiEnrichmentDetails A&AI Enrichment API fetched JSON String
954      * @param alertString Alert String
955      * @param keyPrefix Key prefix that needs to be added to each fetched A&AI Enrichment record
956      */
957     private static void enrichAAI(final AAI preEnrichmentAAI, final String aaiEnrichmentDetails,
958                                   final String alertString, final String keyPrefix) {
959
960         if (aaiEnrichmentDetails == null) {
961             LOG.warn("No A&AI Enrichment possible for AAI: {}. A&AI Enrichment details are absent." +
962                     "Skipping Enrichment for alert message:{}", preEnrichmentAAI, alertString);
963
964         } else {
965
966             final AAI enrichmentDetailsAAI = getEnrichmentDetailsAAI(aaiEnrichmentDetails);
967
968             if (enrichmentDetailsAAI != null) {
969                 final Set<Map.Entry<String, Object>> enrichedAAIEntrySet =
970                         enrichmentDetailsAAI.getDynamicProperties().entrySet();
971                 final Map<String, Object> preEnrichmentAAIDynamicProperties = preEnrichmentAAI.getDynamicProperties();
972
973                 // populate A&AI Enrichment details and add prefix to key
974                 for (Map.Entry<String, Object> enrichedAAIEntry : enrichedAAIEntrySet) {
975                     preEnrichmentAAIDynamicProperties.put(keyPrefix + enrichedAAIEntry.getKey(),
976                             enrichedAAIEntry.getValue());
977                 }
978
979                 LOG.debug("A&AI Enrichment was completed successfully for alert message: {}. Enriched AAI: {}",
980                         alertString, preEnrichmentAAI);
981             } else {
982                 LOG.warn("No A&AI Enrichment possible for AAI: {}. Invalid A&AI Response: {}." +
983                                 "Skipping Enrichment for alert message: {}",
984                         preEnrichmentAAI, aaiEnrichmentDetails, alertString);
985             }
986         }
987
988     }
989
990     /**
991      * Creates a new A&AI object with only top level A&AI Enrichment details
992      *
993      * @param aaiEnrichmentDetails A&AI Enrichment details
994      *
995      * @return new A&AI with only top level A&AI Enrichment details
996      */
997     private static AAI getEnrichmentDetailsAAI(final String aaiEnrichmentDetails) {
998         try {
999             final JsonNode rootNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(aaiEnrichmentDetails);
1000             final Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();
1001             while (fieldsIterator.hasNext()) {
1002                 final Map.Entry<String, JsonNode> fieldEntry = fieldsIterator.next();
1003                 final JsonNode jsonNode = fieldEntry.getValue();
1004                 // remove all arrays, objects from A&AI Enrichment Json
1005                 if (jsonNode.isPojo() || jsonNode.isObject() || jsonNode.isArray()) {
1006                     fieldsIterator.remove();
1007                 }
1008             }
1009             return ANALYTICS_MODEL_OBJECT_MAPPER.treeToValue(rootNode, AAI.class);
1010         } catch (IOException e) {
1011             LOG.error("Failed to Parse AAI Enrichment Details from JSON: {}, Exception: {}.", aaiEnrichmentDetails, e);
1012         }
1013         return null;
1014     }
1015
1016 }