Add support for ABATED alerts within CDAP TCA
[dcaegen2/analytics/tca.git] / dcae-analytics-tca / src / main / java / org / openecomp / dcae / apod / analytics / tca / utils / TCAUtils.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.tca.utils;
22
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Predicate;
27 import com.google.common.base.Predicates;
28 import com.google.common.base.Supplier;
29 import com.google.common.base.Suppliers;
30 import com.google.common.collect.HashBasedTable;
31 import com.google.common.collect.ImmutableList;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Maps;
35 import com.google.common.collect.Table;
36 import com.jayway.jsonpath.DocumentContext;
37 import com.jayway.jsonpath.JsonPath;
38 import com.jayway.jsonpath.TypeRef;
39 import org.apache.commons.lang3.StringUtils;
40 import org.apache.commons.lang3.tuple.ImmutablePair;
41 import org.apache.commons.lang3.tuple.Pair;
42 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
43 import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
44 import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
45 import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;
46 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction;
47 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType;
48 import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader;
49 import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality;
50 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
51 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;
52 import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter;
53 import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;
54 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus;
55 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;
56 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;
57 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
58 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
59 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
60 import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI;
61 import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
62 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;
63 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
64 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
65 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
66 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
67 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
68 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
69 import org.quartz.Job;
70 import org.quartz.JobBuilder;
71 import org.quartz.JobDataMap;
72 import org.quartz.JobDetail;
73 import org.quartz.Scheduler;
74 import org.quartz.SchedulerException;
75 import org.quartz.SimpleScheduleBuilder;
76 import org.quartz.SimpleTrigger;
77 import org.quartz.TriggerBuilder;
78 import org.quartz.impl.StdSchedulerFactory;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
81
82 import java.util.ArrayList;
83 import java.util.Collections;
84 import java.util.Comparator;
85 import java.util.Date;
86 import java.util.HashMap;
87 import java.util.LinkedHashMap;
88 import java.util.LinkedList;
89 import java.util.List;
90 import java.util.Map;
91 import java.util.Properties;
92 import java.util.Set;
93 import java.util.SortedMap;
94 import java.util.TreeMap;
95 import java.util.UUID;
96
97 import javax.annotation.Nonnull;
98 import javax.annotation.Nullable;
99
100 import static com.google.common.collect.Lists.newArrayList;
101 import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;
102
103 /**
104  * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get
105  * pre configured Json Object Mapper understand serialization and deserialization of CEF Message
106  * and TCA Policy
107  *
108  * @author Rajiv Singla . Creation Date: 10/24/2016.
109  */
110 public abstract class TCAUtils extends AnalyticsModelJsonUtils {
111
112     private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);
113
114     /**
115      * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,
116      * WARNING )
117      */
118     private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {
119         @Override
120         public int compare(Threshold threshold1, Threshold threshold2) {
121             return threshold1.getSeverity().compareTo(threshold2.getSeverity());
122         }
123     };
124
125     /**
126      * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy}
127      *
128      * @return TCA Policy Metrics Per Event Name list
129      */
130     public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() {
131         return new Function<TCAPolicy, List<MetricsPerEventName>>() {
132             @Nullable
133             @Override
134             public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) {
135                 return tcaPolicy.getMetricsPerEventName();
136             }
137         };
138     }
139
140     /**
141      * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from
142      * {@link MetricsPerEventName}
143      *
144      * @return Event Names or a Metrics Per Event Name object
145      */
146     public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() {
147         return new Function<MetricsPerEventName, String>() {
148             @Override
149             public String apply(@Nonnull MetricsPerEventName metricsPerEventName) {
150                 return metricsPerEventName.getEventName();
151             }
152         };
153     }
154
155
156     /**
157      * Extracts {@link TCAPolicy} Event Names
158      *
159      * @param tcaPolicy TCA Policy
160      * @return List of event names in the TCA Policy
161      */
162     public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) {
163         final List<MetricsPerEventName> metricsPerEventNames =
164                 tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);
165
166         return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction());
167     }
168
169     /**
170      * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to
171      * change during runtime
172      *
173      * @param tcaPolicy TCA Policy
174      * @return a Supplier that memoize the TCA Policy event names
175      */
176     public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) {
177         return Suppliers.memoize(new Supplier<List<String>>() {
178             @Override
179             public List<String> get() {
180                 return getPolicyEventNames(tcaPolicy);
181             }
182         });
183     }
184
185
186     /**
187      * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path
188      *
189      * @param tcaPolicy TCA Policy
190      * @return A table with Keys of event name and field path containing List of threshold as values
191      */
192     public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) {
193         final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();
194         for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) {
195             final String eventName = metricsPerEventName.getEventName();
196             final List<Threshold> thresholds = metricsPerEventName.getThresholds();
197             for (Threshold threshold : thresholds) {
198                 final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath());
199                 if (existingThresholds == null) {
200                     final LinkedList<Threshold> newThresholdList = new LinkedList<>();
201                     newThresholdList.add(threshold);
202                     domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList);
203                 } else {
204                     domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold);
205                 }
206             }
207         }
208         return domainFRTable;
209     }
210
211
212     /**
213      * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table
214      *
215      * @param tcaPolicy TCA Policy
216      * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values
217      */
218     public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier
219     (final TCAPolicy tcaPolicy) {
220         return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {
221             @Override
222             public Table<String, String, List<Threshold>> get() {
223                 return getPolicyEventNameThresholdsTable(tcaPolicy);
224             }
225         });
226     }
227
228
229     /**
230      * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},
231      * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to
232      * filter out messages which does not match policy domain or event Name
233      *
234      * @param cefMessage CEF Message
235      * @param tcaPolicy TCA Policy
236      * @return Message Process Context after processing filter chain
237      */
238     public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,
239                                                           @Nonnull final TCAPolicy tcaPolicy) {
240
241         final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();
242         final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();
243         final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter();
244         // Create a list of message processors
245         final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =
246                 ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter);
247         final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);
248         // Create a message processors chain
249         final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =
250                 new GenericMessageChainProcessor<>(messageProcessors, processorContext);
251         // process chain
252         return tcaProcessingChain.processChain();
253     }
254
255
256     /**
257      * Extracts json path values for given json Field Paths from using Json path notation. Assumes
258      * that values extracted are always long
259      *
260      * @param message CEF Message
261      * @param jsonFieldPaths Json Field Paths
262      * @return Map containing key as json path and values as values associated with that json path
263      */
264     public static Map<String, List<Long>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>
265             jsonFieldPaths) {
266
267         final Map<String, List<Long>> jsonFieldPathMap = new HashMap<>();
268         final DocumentContext documentContext = JsonPath.parse(message);
269
270         for (String jsonFieldPath : jsonFieldPaths) {
271             final List<Long> jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<Long>>() {
272             });
273             // If Json Field Values are not or empty
274             if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
275                 // Filter out all null values in the filed values list
276                 final List<Long> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,
277                         Predicates.<Long>notNull()));
278                 // If there are non null values put them in the map
279                 if (!nonNullValues.isEmpty()) {
280                     jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
281                 }
282             }
283         }
284
285         return jsonFieldPathMap;
286     }
287
288     /**
289      * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
290      * it applies threshold in order of their severity and record the first threshold per message field path
291      *
292      * @param messageFieldValues Field Path Values extracted from CEF Message
293      * @param fieldThresholds Policy Thresholds for Field Path
294      * @return Optional of violated threshold for a field path
295      */
296     public static Optional<Threshold> thresholdCalculator(final List<Long> messageFieldValues, final List<Threshold>
297             fieldThresholds) {
298         // order thresholds by severity
299         Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);
300         // Now apply each threshold to field values
301         for (Threshold fieldThreshold : fieldThresholds) {
302             for (Long messageFieldValue : messageFieldValues) {
303                 final Boolean isThresholdViolated =
304                         fieldThreshold.getDirection().operate(messageFieldValue, fieldThreshold.getThresholdValue());
305                 if (isThresholdViolated) {
306                     final Threshold violatedThreshold = Threshold.copy(fieldThreshold);
307                     violatedThreshold.setActualFieldValue(messageFieldValue);
308                     return Optional.of(violatedThreshold);
309                 }
310             }
311         }
312         return Optional.absent();
313     }
314
315     /**
316      * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.
317      * Grabs first highest priority violated threshold
318      *
319      * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds
320      * @return First Highest priority violated threshold
321      */
322     public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {
323
324         final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());
325
326         if (violatedThresholds.size() == 1) {
327             return violatedThresholds.get(0);
328         }
329         Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);
330         // Just grab the first violated threshold with highest priority
331         return violatedThresholds.get(0);
332     }
333
334
335     /**
336      * Creates {@link MetricsPerEventName} object which contains violated thresholds
337      *
338      * @param tcaPolicy TCA Policy
339      * @param violatedThreshold Violated thresholds
340      * @param eventName Event Name
341      *
342      * @return MetricsPerEventName object containing one highest severity violated threshold
343      */
344     public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,
345                                                             @Nonnull final Threshold violatedThreshold,
346                                                             @Nonnull final String eventName) {
347
348         final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList(
349                 Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() {
350                     @Override
351                     public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) {
352                         return metricsPerEventName.getEventName().equals(eventName);
353                     }
354                 }));
355         // TCA policy must have only one metrics per event Name
356         if (metricsPerEventNames.size() == 1) {
357             final MetricsPerEventName violatedMetrics =
358                     MetricsPerEventName.copy(metricsPerEventNames.get(0));
359             violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));
360             return violatedMetrics;
361         } else {
362             final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName);
363             throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
364         }
365     }
366
367     /**
368      * Computes threshold violations
369      *
370      * @param processorContext Filtered processor Context
371      * @return processor context with any threshold violations
372      */
373     public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {
374         final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();
375         return policyThresholdsProcessor.apply(processorContext);
376     }
377
378
379     /**
380      * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}
381      * formats
382      *
383      * @param processorContextWithViolations processor context which has TCA violations
384      * @param tcaAppName tca app name
385      * @param isAlertInCEFFormat determines if output alert is in CEF format
386      *
387      * @return TCA Alert String
388      *
389      * @throws JsonProcessingException If alert cannot be parsed into JSON String
390      */
391     public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,
392                                               final String tcaAppName,
393                                               final Boolean isAlertInCEFFormat) throws JsonProcessingException {
394         if (isAlertInCEFFormat != null && isAlertInCEFFormat) {
395             final EventListener eventListenerWithViolations =
396                     addThresholdViolationFields(processorContextWithViolations);
397             final String alertString = writeValueAsString(eventListenerWithViolations);
398             LOG.debug("Created alert in CEF Format: {}", alertString);
399             return alertString;
400         } else {
401             final TCAVESResponse newTCAVESResponse =
402                     createNewTCAVESResponse(processorContextWithViolations, tcaAppName);
403             final String alertString = writeValueAsString(newTCAVESResponse);
404             LOG.debug("Created alert in Non CEF Format: {}", alertString);
405             return alertString;
406         }
407     }
408
409     /**
410      * Adds threshold violation fields to {@link EventListener}
411      *
412      * @param processorContextWithViolations processor context that contains violations
413      * @return event listener with threshold crossing alert fields populated
414      */
415     public static EventListener addThresholdViolationFields(
416             final TCACEFProcessorContext processorContextWithViolations) {
417
418         final MetricsPerEventName metricsPerEventName =
419                 processorContextWithViolations.getMetricsPerEventName();
420         // confirm violations are indeed present
421         if (metricsPerEventName == null) {
422             final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";
423             throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
424         }
425
426         // get violated threshold
427         final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
428         final EventListener eventListener = processorContextWithViolations.getCEFEventListener();
429         final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
430
431         // create new threshold crossing alert fields
432         final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();
433         thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());
434         thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());
435         thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));
436         thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);
437         thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);
438         thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());
439         thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());
440         thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName());
441
442         // create new performance count
443         final PerformanceCounter performanceCounter = new PerformanceCounter();
444         performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));
445         performanceCounter.setName(violatedThreshold.getFieldPath());
446         performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());
447         performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());
448
449         // set additional parameters for threshold crossing alert fields
450         thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));
451
452         // add threshold crossing fields to existing event listener
453         eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);
454
455         return eventListener;
456     }
457
458     /**
459      * Converts {@link EventSeverity} to {@link Criticality}
460      *
461      * @param eventSeverity event severity
462      *
463      * @return performance counter criticality
464      */
465     private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {
466         switch (eventSeverity) {
467             case CRITICAL:
468                 return Criticality.CRIT;
469             case MAJOR:
470                 return Criticality.MAJ;
471             default:
472                 return Criticality.UNKNOWN;
473         }
474     }
475
476     /**
477      * Creates {@link TCAVESResponse} object
478      *
479      * @param processorContext processor Context with violations
480      * @param tcaAppName TCA App Name
481      *
482      * @return TCA VES Response Message
483      */
484     public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,
485                                                          final String tcaAppName) {
486
487         final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
488         // confirm violations are indeed present
489         if (metricsPerEventName == null) {
490             final String errorMessage = "No violations metrics. Unable to create VES Response";
491             throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
492         }
493
494         final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
495         final EventListener eventListener = processorContext.getCEFEventListener();
496         final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
497
498         final TCAVESResponse tcavesResponse = new TCAVESResponse();
499         // ClosedLoopControlName included in the DCAE configuration Policy
500         tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());
501         // version included in the DCAE configuration Policy
502         tcavesResponse.setVersion(violatedThreshold.getVersion());
503         // Generate a UUID for this output message
504         tcavesResponse.setRequestID(UUID.randomUUID().toString());
505         // commonEventHeader.startEpochMicrosec from the received VES measurementsForVfScaling message
506         tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());
507         // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot
508         // TODO: Find out how to get this field
509         tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);
510
511         final AAI aai = new AAI();
512         tcavesResponse.setAai(aai);
513
514         // VM specific settings
515         if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) {
516             // Hard Coded - "VM"
517             tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE);
518             // Hard Coded - "vserver.vserver-name"
519             tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET);
520             aai.setGenericServerId(commonEventHeader.getReportingEntityName());
521         } else {
522             // VNF specific settings
523             // Hard Coded - "VNF"
524             tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE);
525             // Hard Coded - "generic-vnf.vnf-id"
526             tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET);
527             // commonEventHeader.reportingEntityName from the received VES measurementsForVfScaling message (value for
528             // the data element used in A&AI)
529             aai.setGenericVNFId(commonEventHeader.getReportingEntityName());
530         }
531
532         // Hard Coded - "DCAE"
533         tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);
534         // policyScope included in the DCAE configuration Policy
535         tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope());
536         // policyName included in the DCAE configuration Policy
537         tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName());
538         // policyVersion included in the DCAE configuration Policy
539         tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion());
540         // Extracted from violated threshold
541         tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name());
542
543         return tcavesResponse;
544     }
545
546
547     /**
548      * Extract Domain and Event Name from processor context if present
549      *
550      * @param processorContext processor context
551      * @return Tuple of domain and event Name
552      */
553     public static Pair<String, String> getDomainAndEventName(
554             @Nullable final TCACEFProcessorContext processorContext) {
555
556         String domain = null;
557         String eventName = null;
558
559         if (processorContext != null &&
560                 processorContext.getCEFEventListener() != null &&
561                 processorContext.getCEFEventListener().getEvent() != null &&
562                 processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {
563             final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()
564                     .getCommonEventHeader();
565
566             if (commonEventHeader.getDomain() != null) {
567                 domain = commonEventHeader.getDomain().name();
568             }
569
570             if (commonEventHeader.getEventName() != null) {
571                 eventName = commonEventHeader.getEventName();
572             }
573
574         }
575
576         return new ImmutablePair<>(domain, eventName);
577
578     }
579
580     /**
581      * Creates {@link TCAPolicy} Metrics per Event Name list
582      *
583      * @param eventNamesMap Map containing event Name as key and corresponding values
584      *
585      * @return List of {@link MetricsPerEventName}
586      */
587     public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList(
588             final Map<String, Map<String, String>> eventNamesMap) {
589
590         // create a new metrics per event Name list
591         final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>();
592
593         for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) {
594
595             // create new metrics per event Name instance
596             final MetricsPerEventName newMetricsPerEventName =
597                     createNewMetricsPerEventName(eventNamesEntry);
598             metricsPerEventNames.add(newMetricsPerEventName);
599
600             // determine all threshold related values
601             final Map<String, String> thresholdsValuesMaps =
602                     filterMapByKeyNamePrefix(eventNamesEntry.getValue(),
603                             AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);
604
605             // create a map of all threshold values
606             final Map<String, Map<String, String>> thresholdsMap =
607                     extractSubTree(thresholdsValuesMaps, 1, 2,
608                             AnalyticsConstants.TCA_POLICY_DELIMITER);
609
610             // add thresholds to nmetrics per event Names threshold list
611             for (Map<String, String> thresholdMap : thresholdsMap.values()) {
612                 newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap));
613             }
614
615         }
616
617         return metricsPerEventNames;
618     }
619
620     /**
621      * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap
622      *
623      * @param thresholdMap threshold map with threshold values
624      *
625      * @return new instance of TCA Policy Threshold
626      */
627     public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {
628         final Threshold threshold = new Threshold();
629         threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));
630         threshold.setVersion(thresholdMap.get("policy.version"));
631         threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));
632         threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));
633         threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));
634         threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));
635         threshold.setClosedLoopEventStatus(
636                 ControlLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus")));
637         return threshold;
638     }
639
640     /**
641      * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope
642      * extracted from given eventNamesEntry
643      *
644      * @param eventNamesEntry Event Names Entry
645      *
646      * @return new instance of MetricsPerEventName
647      */
648     public static MetricsPerEventName createNewMetricsPerEventName(
649             final Map.Entry<String, Map<String, String>> eventNamesEntry) {
650         // determine event Name
651         final String eventName = eventNamesEntry.getKey();
652         // determine event Name thresholds
653         final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue();
654         final MetricsPerEventName metricsPerEventName = new MetricsPerEventName();
655         final List<Threshold> thresholds = new LinkedList<>();
656         metricsPerEventName.setThresholds(thresholds);
657         metricsPerEventName.setEventName(eventName);
658         // bind policyName, policyVersion, policyScope and closedLoopControlName
659         metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName"));
660         metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion"));
661         metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope"));
662         metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf(
663                 metricsPerEventNameThresholdsMap.get("closedLoopControlName")));
664         return metricsPerEventName;
665     }
666
667     /**
668      * Converts a flattened key/value map which has keys delimited by a given delimiter.
669      * The start Index and end index extract the sub-key value and returns a new map containing
670      * sub-keys and values.
671      *
672      * @param actualMap actual Map
673      * @param startIndex start index
674      * @param endIndex end index
675      * @param delimiter delimiter
676      *
677      * @return Map with new sub tree map
678      */
679     public static Map<String, Map<String, String>> extractSubTree(
680             final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {
681
682         final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();
683
684         // iterate over actual map entries
685         for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {
686             final String actualMapKey = actualMapEntry.getKey();
687             final String actualMapValue = actualMapEntry.getValue();
688
689             // determine delimiter start and end index
690             final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);
691             final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);
692             final int keyLength = actualMapKey.length();
693
694             // extract sub-tree map
695             if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {
696                 final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);
697                 final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);
698                 final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);
699                 if (existingThresholdMap == null) {
700                     Map<String, String> newThresholdMap = new LinkedHashMap<>();
701                     newThresholdMap.put(subMapKey, actualMapValue);
702                     subTreeMap.put(thresholdKey, newThresholdMap);
703                 } else {
704                     existingThresholdMap.put(subMapKey, actualMapValue);
705                 }
706
707             }
708         }
709
710         return subTreeMap;
711
712     }
713
714
715     /**
716      * Provides a view of underlying map that filters out entries with keys starting with give prefix
717      *
718      * @param actualMap Target map that needs to be filtered
719      * @param keyNamePrefix key prefix
720      *
721      * @return a view of actual map which only show entries which have give prefix
722      */
723     public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,
724                                                                final String keyNamePrefix) {
725         return Maps.filterKeys(actualMap,
726                 new Predicate<String>() {
727                     @Override
728                     public boolean apply(@Nullable String key) {
729                         return key != null && key.startsWith(keyNamePrefix);
730                     }
731                 });
732     }
733
734
735     /**
736      * Creates Quartz Scheduler
737      *
738      * @param pollingIntervalMS polling interval
739      * @param stdSchedulerFactory Quartz standard schedule factory instance
740      * @param quartzPublisherPropertiesFileName quartz properties file name
741      * @param jobDataMap job Data map
742      * @param quartzJobClass Quartz Job Class
743      * @param quartzJobName Quartz Job Name
744      * @param quartzTriggerName Quartz Trigger name
745      *
746      * @param <T> An implementation of Quartz {@link Job} interface
747      * @return Configured Quartz Scheduler
748      *
749      * @throws SchedulerException exception if unable to create to Quartz Scheduler
750      */
751     public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,
752             final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,
753             final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,
754             final String quartzTriggerName) throws SchedulerException {
755
756         // Initialize a new Quartz Standard scheduler
757         LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",
758                 quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);
759         final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(
760                 quartzPublisherPropertiesFileName, new Properties());
761         stdSchedulerFactory.initialize(quartzProperties);
762         final Scheduler scheduler = stdSchedulerFactory.getScheduler();
763
764         // Create a new job detail
765         final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,
766                 AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();
767
768         // Create a new scheduling builder
769         final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
770                 .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule
771                 .repeatForever(); // repeats while worker is running
772
773         // Create a trigger for the TCA Publisher Job
774         final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()
775                 .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)
776                 .startNow() // job starts right away
777                 .withSchedule(simpleScheduleBuilder).build();
778
779         scheduler.scheduleJob(jobDetail, simpleTrigger);
780         LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());
781         return scheduler;
782     }
783
784 }