Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-tca / src / main / java / org / openecomp / dcae / apod / analytics / tca / utils / TCAUtils.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.tca.utils;
22
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Predicate;
27 import com.google.common.base.Predicates;
28 import com.google.common.base.Supplier;
29 import com.google.common.base.Suppliers;
30 import com.google.common.collect.HashBasedTable;
31 import com.google.common.collect.ImmutableList;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Maps;
35 import com.google.common.collect.Table;
36 import com.jayway.jsonpath.DocumentContext;
37 import com.jayway.jsonpath.JsonPath;
38 import com.jayway.jsonpath.TypeRef;
39 import org.apache.commons.lang3.StringUtils;
40 import org.apache.commons.lang3.tuple.ImmutablePair;
41 import org.apache.commons.lang3.tuple.Pair;
42 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
43 import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
44 import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
45 import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;
46 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction;
47 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType;
48 import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader;
49 import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality;
50 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
51 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;
52 import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter;
53 import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;
54 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;
55 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerFunctionalRole;
56 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
57 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
58 import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI;
59 import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
60 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;
61 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
62 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
63 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
64 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyFunctionalRoleFilter;
65 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
66 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
67 import org.quartz.Job;
68 import org.quartz.JobBuilder;
69 import org.quartz.JobDataMap;
70 import org.quartz.JobDetail;
71 import org.quartz.Scheduler;
72 import org.quartz.SchedulerException;
73 import org.quartz.SimpleScheduleBuilder;
74 import org.quartz.SimpleTrigger;
75 import org.quartz.TriggerBuilder;
76 import org.quartz.impl.StdSchedulerFactory;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
79
80 import java.util.ArrayList;
81 import java.util.Collections;
82 import java.util.Comparator;
83 import java.util.Date;
84 import java.util.HashMap;
85 import java.util.LinkedHashMap;
86 import java.util.LinkedList;
87 import java.util.List;
88 import java.util.Map;
89 import java.util.Properties;
90 import java.util.Set;
91 import java.util.SortedMap;
92 import java.util.TreeMap;
93 import java.util.UUID;
94
95 import javax.annotation.Nonnull;
96 import javax.annotation.Nullable;
97
98 import static com.google.common.collect.Lists.newArrayList;
99 import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;
100
101 /**
102  * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get
103  * pre configured Json Object Mapper understand serialization and deserialization of CEF Message
104  * and TCA Policy
105  *
106  * @author Rajiv Singla . Creation Date: 10/24/2016.
107  */
108 public abstract class TCAUtils extends AnalyticsModelJsonUtils {
109
110     private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);
111
112     /**
113      * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,
114      * WARNING )
115      */
116     private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {
117         @Override
118         public int compare(Threshold threshold1, Threshold threshold2) {
119             return threshold1.getSeverity().compareTo(threshold2.getSeverity());
120         }
121     };
122
123     /**
124      * {@link Function} that extracts {@link TCAPolicy#getMetricsPerFunctionalRole()} from {@link TCAPolicy}
125      *
126      * @return TCA Policy Metrics Per Functional Roles List
127      */
128     public static Function<TCAPolicy, List<MetricsPerFunctionalRole>> tcaPolicyMetricsExtractorFunction() {
129         return new Function<TCAPolicy, List<MetricsPerFunctionalRole>>() {
130             @Nullable
131             @Override
132             public List<MetricsPerFunctionalRole> apply(@Nonnull TCAPolicy tcaPolicy) {
133                 return tcaPolicy.getMetricsPerFunctionalRole();
134             }
135         };
136     }
137
138     /**
139      * {@link Function} that extracts {@link MetricsPerFunctionalRole#getFunctionalRole()} from
140      * {@link MetricsPerFunctionalRole}
141      *
142      * @return Functional role or a Metrics Per Functional Role object
143      */
144     public static Function<MetricsPerFunctionalRole, String> tcaFunctionalRoleExtractorFunction() {
145         return new Function<MetricsPerFunctionalRole, String>() {
146             @Override
147             public String apply(@Nonnull MetricsPerFunctionalRole metricsPerFunctionalRole) {
148                 return metricsPerFunctionalRole.getFunctionalRole();
149             }
150         };
151     }
152
153
154     /**
155      * Extracts {@link TCAPolicy} Functional Roles
156      *
157      * @param tcaPolicy TCA Policy
158      * @return List of functional Roles in the tca Policy
159      */
160     public static List<String> getPolicyFunctionalRoles(@Nonnull final TCAPolicy tcaPolicy) {
161         final List<MetricsPerFunctionalRole> metricsPerFunctionalRoles =
162                 tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);
163
164         return Lists.transform(metricsPerFunctionalRoles, tcaFunctionalRoleExtractorFunction());
165     }
166
167     /**
168      * A {@link Supplier} which caches {@link TCAPolicy} Functional Roles as they are not expected to
169      * change during runtime
170      *
171      * @param tcaPolicy TCA Policy
172      * @return a Supplier that memoize the Functional roles
173      */
174     public static Supplier<List<String>> getPolicyFunctionalRoleSupplier(@Nonnull final TCAPolicy tcaPolicy) {
175         return Suppliers.memoize(new Supplier<List<String>>() {
176             @Override
177             public List<String> get() {
178                 return getPolicyFunctionalRoles(tcaPolicy);
179             }
180         });
181     }
182
183
184     /**
185      * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Functional Role and Threshold Field path
186      *
187      * @param tcaPolicy TCA Policy
188      * @return A table with Keys of functional role and field path containing List of threshold as values
189      */
190     public static Table<String, String, List<Threshold>> getPolicyFRThresholdsTable(final TCAPolicy tcaPolicy) {
191         final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();
192         for (MetricsPerFunctionalRole metricsPerFunctionalRole : tcaPolicy.getMetricsPerFunctionalRole()) {
193             final String functionalRole = metricsPerFunctionalRole.getFunctionalRole();
194             final List<Threshold> thresholds = metricsPerFunctionalRole.getThresholds();
195             for (Threshold threshold : thresholds) {
196                 final List<Threshold> existingThresholds = domainFRTable.get(functionalRole, threshold.getFieldPath());
197                 if (existingThresholds == null) {
198                     final LinkedList<Threshold> newThresholdList = new LinkedList<>();
199                     newThresholdList.add(threshold);
200                     domainFRTable.put(functionalRole, threshold.getFieldPath(), newThresholdList);
201                 } else {
202                     domainFRTable.get(functionalRole, threshold.getFieldPath()).add(threshold);
203                 }
204             }
205         }
206         return domainFRTable;
207     }
208
209
210     /**
211      * A {@link Supplier} which caches Policy Functional Role and Threshold Field Path Thresholds lookup table
212      *
213      * @param tcaPolicy TCA Policy
214      * @return Cached Supplier for table with Keys of functional role and field path containing thresholds as values
215      */
216     public static Supplier<Table<String, String, List<Threshold>>> getPolicyFRThresholdsTableSupplier
217     (final TCAPolicy tcaPolicy) {
218         return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {
219             @Override
220             public Table<String, String, List<Threshold>> get() {
221                 return getPolicyFRThresholdsTable(tcaPolicy);
222             }
223         });
224     }
225
226
227     /**
228      * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},
229      * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyFunctionalRoleFilter}s to
230      * filter out messages which does not match policy domain or functional role
231      *
232      * @param cefMessage CEF Message
233      * @param tcaPolicy TCA Policy
234      * @return Message Process Context after processing filter chain
235      */
236     public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,
237                                                           @Nonnull final TCAPolicy tcaPolicy) {
238
239         final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();
240         final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();
241         final TCACEFPolicyFunctionalRoleFilter functionalRoleFilter = new TCACEFPolicyFunctionalRoleFilter();
242         // Create a list of message processors
243         final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =
244                 ImmutableList.of(jsonProcessor, domainFilter, functionalRoleFilter);
245         final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);
246         // Create a message processors chain
247         final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =
248                 new GenericMessageChainProcessor<>(messageProcessors, processorContext);
249         // process chain
250         return tcaProcessingChain.processChain();
251     }
252
253
254     /**
255      * Extracts json path values for given json Field Paths from using Json path notation. Assumes
256      * that values extracted are always long
257      *
258      * @param message CEF Message
259      * @param jsonFieldPaths Json Field Paths
260      * @return Map containing key as json path and values as values associated with that json path
261      */
262     public static Map<String, List<Long>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>
263             jsonFieldPaths) {
264
265         final Map<String, List<Long>> jsonFieldPathMap = new HashMap<>();
266         final DocumentContext documentContext = JsonPath.parse(message);
267
268         for (String jsonFieldPath : jsonFieldPaths) {
269             final List<Long> jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<Long>>() {
270             });
271             // If Json Field Values are not or empty
272             if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
273                 // Filter out all null values in the filed values list
274                 final List<Long> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,
275                         Predicates.<Long>notNull()));
276                 // If there are non null values put them in the map
277                 if (!nonNullValues.isEmpty()) {
278                     jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
279                 }
280             }
281         }
282
283         return jsonFieldPathMap;
284     }
285
286     /**
287      * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
288      * it applies threshold in order of their severity and record the first threshold per message field path
289      *
290      * @param messageFieldValues Field Path Values extracted from CEF Message
291      * @param fieldThresholds Policy Thresholds for Field Path
292      * @return Optional of violated threshold for a field path
293      */
294     public static Optional<Threshold> thresholdCalculator(final List<Long> messageFieldValues, final List<Threshold>
295             fieldThresholds) {
296         // order thresholds by severity
297         Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);
298         // Now apply each threshold to field values
299         for (Threshold fieldThreshold : fieldThresholds) {
300             for (Long messageFieldValue : messageFieldValues) {
301                 final Boolean isThresholdViolated =
302                         fieldThreshold.getDirection().operate(messageFieldValue, fieldThreshold.getThresholdValue());
303                 if (isThresholdViolated) {
304                     final Threshold violatedThreshold = Threshold.copy(fieldThreshold);
305                     violatedThreshold.setActualFieldValue(messageFieldValue);
306                     return Optional.of(violatedThreshold);
307                 }
308             }
309         }
310         return Optional.absent();
311     }
312
313     /**
314      * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.
315      * Grabs first highest priority violated threshold
316      *
317      * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds
318      * @return First Highest priority violated threshold
319      */
320     public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {
321
322         final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());
323
324         if (violatedThresholds.size() == 1) {
325             return violatedThresholds.get(0);
326         }
327         Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);
328         // Just grab the first violated threshold with highest priority
329         return violatedThresholds.get(0);
330     }
331
332
333     /**
334      * Creates {@link MetricsPerFunctionalRole} object which contains violated thresholds
335      *
336      * @param tcaPolicy TCA Policy
337      * @param violatedThreshold Violated thresholds
338      * @param functionalRole Functional Role
339      *
340      * @return MetricsPerFunctionalRole object containing one highest severity violated threshold
341      */
342     public static MetricsPerFunctionalRole createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,
343                                                                  @Nonnull final Threshold violatedThreshold,
344                                                                  @Nonnull final String functionalRole) {
345
346         final ArrayList<MetricsPerFunctionalRole> metricsPerFunctionalRoles = newArrayList(
347                 Iterables.filter(tcaPolicy.getMetricsPerFunctionalRole(), new Predicate<MetricsPerFunctionalRole>() {
348                     @Override
349                     public boolean apply(@Nonnull MetricsPerFunctionalRole metricsPerFunctionalRole) {
350                         return metricsPerFunctionalRole.getFunctionalRole().equals(functionalRole);
351                     }
352                 }));
353         // TCA policy must have only one metrics role per functional role
354         if (metricsPerFunctionalRoles.size() == 1) {
355             final MetricsPerFunctionalRole violatedMetrics =
356                     MetricsPerFunctionalRole.copy(metricsPerFunctionalRoles.get(0));
357             violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));
358             return violatedMetrics;
359         } else {
360             final String errorMessage = String.format("TCA Policy must contain functional Role: %s", functionalRole);
361             throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
362         }
363     }
364
365     /**
366      * Computes threshold violations
367      *
368      * @param processorContext Filtered processor Context
369      * @return processor context with any threshold violations
370      */
371     public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {
372         final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();
373         return policyThresholdsProcessor.apply(processorContext);
374     }
375
376
377     /**
378      * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}
379      * formats
380      *
381      * @param processorContextWithViolations processor context which has TCA violations
382      * @param tcaAppName tca app name
383      * @param isAlertInCEFFormat determines if output alert is in CEF format
384      *
385      * @return TCA Alert String
386      *
387      * @throws JsonProcessingException If alert cannot be parsed into JSON String
388      */
389     public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,
390                                               final String tcaAppName,
391                                               final Boolean isAlertInCEFFormat) throws JsonProcessingException {
392         if (isAlertInCEFFormat != null && isAlertInCEFFormat) {
393             final EventListener eventListenerWithViolations =
394                     addThresholdViolationFields(processorContextWithViolations);
395             final String alertString = writeValueAsString(eventListenerWithViolations);
396             LOG.debug("Created alert in CEF Format: {}", alertString);
397             return alertString;
398         } else {
399             final TCAVESResponse newTCAVESResponse =
400                     createNewTCAVESResponse(processorContextWithViolations, tcaAppName);
401             final String alertString = writeValueAsString(newTCAVESResponse);
402             LOG.debug("Created alert in Non CEF Format: {}", alertString);
403             return alertString;
404         }
405     }
406
407     /**
408      * Adds threshold violation fields to {@link EventListener}
409      *
410      * @param processorContextWithViolations processor context that contains violations
411      * @return event listener with threshold crossing alert fields populated
412      */
413     public static EventListener addThresholdViolationFields(
414             final TCACEFProcessorContext processorContextWithViolations) {
415
416         final MetricsPerFunctionalRole metricsPerFunctionalRole =
417                 processorContextWithViolations.getMetricsPerFunctionalRole();
418         // confirm violations are indeed present
419         if (metricsPerFunctionalRole == null) {
420             final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";
421             throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
422         }
423
424         // get violated threshold
425         final Threshold violatedThreshold = metricsPerFunctionalRole.getThresholds().get(0);
426         final EventListener eventListener = processorContextWithViolations.getCEFEventListener();
427         final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
428
429         // create new threshold crossing alert fields
430         final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();
431         thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());
432         thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());
433         thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));
434         thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);
435         thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);
436         thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());
437         thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());
438         thresholdCrossingAlertFields.setElementType(commonEventHeader.getFunctionalRole());
439
440         // create new performance count
441         final PerformanceCounter performanceCounter = new PerformanceCounter();
442         performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));
443         performanceCounter.setName(violatedThreshold.getFieldPath());
444         performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());
445         performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());
446
447         // set additional parameters for threshold crossing alert fields
448         thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));
449
450         // add threshold crossing fields to existing event listener
451         eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);
452
453         return eventListener;
454     }
455
456     /**
457      * Converts {@link EventSeverity} to {@link Criticality}
458      *
459      * @param eventSeverity event severity
460      *
461      * @return performance counter criticality
462      */
463     private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {
464         switch (eventSeverity) {
465             case CRITICAL:
466                 return Criticality.CRIT;
467             case MAJOR:
468                 return Criticality.MAJ;
469             default:
470                 return Criticality.UNKNOWN;
471         }
472     }
473
474     /**
475      * Creates {@link TCAVESResponse} object
476      *
477      * @param processorContext processor Context with violations
478      * @param tcaAppName TCA App Name
479      *
480      * @return TCA VES Response Message
481      */
482     public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,
483                                                          final String tcaAppName) {
484
485         final MetricsPerFunctionalRole metricsPerFunctionalRole = processorContext.getMetricsPerFunctionalRole();
486         // confirm violations are indeed present
487         if (metricsPerFunctionalRole == null) {
488             final String errorMessage = "No violations metrics. Unable to create VES Response";
489             throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
490         }
491
492         final String functionalRole = metricsPerFunctionalRole.getFunctionalRole();
493         final Threshold violatedThreshold = metricsPerFunctionalRole.getThresholds().get(0);
494         final EventListener eventListener = processorContext.getCEFEventListener();
495         final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
496
497         final TCAVESResponse tcavesResponse = new TCAVESResponse();
498         // ClosedLoopControlName included in the DCAE configuration Policy
499         tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());
500         // version included in the DCAE configuration Policy
501         tcavesResponse.setVersion(violatedThreshold.getVersion());
502         // Generate a UUID for this output message
503         tcavesResponse.setRequestID(UUID.randomUUID().toString());
504         // commonEventHeader.startEpochMicrosec from the received VES measurementsForVfScaling message
505         tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());
506         // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot
507         // TODO: Find out how to get this field
508         tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);
509
510         final AAI aai = new AAI();
511         tcavesResponse.setAai(aai);
512
513         // vLoadBalancer specific settings
514         if (isFunctionalRoleVLoadBalancer(functionalRole)) {
515             // Hard Coded - "VM"
516             tcavesResponse.setTargetType(AnalyticsConstants.LOAD_BALANCER_TCA_VES_RESPONSE_TARGET_TYPE);
517             // Hard Coded - "vserver.vserver-name"
518             tcavesResponse.setTarget(AnalyticsConstants.LOAD_BALANCER_TCA_VES_RESPONSE_TARGET);
519             aai.setGenericServerId(commonEventHeader.getReportingEntityName());
520         } else {
521             // Hard Coded - "VNF"
522             tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_TARGET_TYPE);
523             // Hard Coded - "generic-vnf.vnf-id"
524             tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_TARGET);
525             // commonEventHeader.reportingEntityName from the received VES measurementsForVfScaling message (value for
526             // the data element used in A&AI)
527             aai.setGenericVNFId(commonEventHeader.getReportingEntityName());
528         }
529
530         // Hard Coded - "DCAE"
531         tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);
532         // policyScope included in the DCAE configuration Policy
533         tcavesResponse.setPolicyScope(metricsPerFunctionalRole.getPolicyScope());
534         // policyName included in the DCAE configuration Policy
535         tcavesResponse.setPolicyName(metricsPerFunctionalRole.getPolicyName());
536         // policyVersion included in the DCAE configuration Policy
537         tcavesResponse.setPolicyVersion(metricsPerFunctionalRole.getPolicyVersion());
538         // Hard Coded - "ONSET"
539         tcavesResponse.setClosedLoopEventStatus(AnalyticsConstants.TCA_VES_RESPONSE_CLOSED_LOOP_EVENT_STATUS);
540
541         return tcavesResponse;
542     }
543
544     /**
545      * Determines if Functional Role is vLoadBalancer
546      *
547      * @param functionalRole functional Role to check
548      *
549      * @return return true if functional role is for vLoadBalancer
550      */
551     private static boolean isFunctionalRoleVLoadBalancer(final String functionalRole) {
552         return functionalRole.equals(AnalyticsConstants.LOAD_BALANCER_FUNCTIONAL_ROLE);
553     }
554
555
556     /**
557      * Extract Domain and functional Role from processor context if present
558      *
559      * @param processorContext processor context
560      * @return Tuple of domain and functional role
561      */
562     public static Pair<String, String> getDomainAndFunctionalRole(@Nullable final TCACEFProcessorContext
563                                                                           processorContext) {
564
565         String domain = null;
566         String functionalRole = null;
567
568         if (processorContext != null &&
569                 processorContext.getCEFEventListener() != null &&
570                 processorContext.getCEFEventListener().getEvent() != null &&
571                 processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {
572             final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()
573                     .getCommonEventHeader();
574
575             if (commonEventHeader.getDomain() != null) {
576                 domain = commonEventHeader.getDomain();
577             }
578
579             if (commonEventHeader.getFunctionalRole() != null) {
580                 functionalRole = commonEventHeader.getFunctionalRole();
581             }
582
583         }
584
585         return new ImmutablePair<>(domain, functionalRole);
586
587     }
588
589     /**
590      * Creates {@link TCAPolicy} Metrics per Functional Role list
591      *
592      * @param functionalRolesMap Map containing functional Roles as key and corresponding values
593      *
594      * @return List of {@link MetricsPerFunctionalRole}
595      */
596     public static List<MetricsPerFunctionalRole> createTCAPolicyMetricsPerFunctionalRoleList(
597             final Map<String, Map<String, String>> functionalRolesMap) {
598
599         // create a new metrics per functional role list
600         final List<MetricsPerFunctionalRole> metricsPerFunctionalRoles = new LinkedList<>();
601
602         for (Map.Entry<String, Map<String, String>> functionalRolesEntry : functionalRolesMap.entrySet()) {
603
604             // create new metrics per functional role instance
605             final MetricsPerFunctionalRole newMetricsPerFunctionalRole =
606                     createNewMetricsPerFunctionalRole(functionalRolesEntry);
607             metricsPerFunctionalRoles.add(newMetricsPerFunctionalRole);
608
609             // determine all threshold related values
610             final Map<String, String> thresholdsValuesMaps =
611                     filterMapByKeyNamePrefix(functionalRolesEntry.getValue(),
612                             AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);
613
614             // create a map of all threshold values
615             final Map<String, Map<String, String>> thresholdsMap =
616                     extractSubTree(thresholdsValuesMaps, 1, 2,
617                             AnalyticsConstants.TCA_POLICY_DELIMITER);
618
619             // add thresholds to nmetrics per functional roles threshold list
620             for (Map<String, String> thresholdMap : thresholdsMap.values()) {
621                 newMetricsPerFunctionalRole.getThresholds().add(createNewThreshold(thresholdMap));
622             }
623
624         }
625
626         return metricsPerFunctionalRoles;
627     }
628
629     /**
630      * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap
631      *
632      * @param thresholdMap threshold map with threshold values
633      *
634      * @return new instance of TCA Policy Threshold
635      */
636     public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {
637         final Threshold threshold = new Threshold();
638         threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));
639         threshold.setVersion(thresholdMap.get("policy.version"));
640         threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));
641         threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));
642         threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));
643         threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));
644         return threshold;
645     }
646
647     /**
648      * Create new {@link MetricsPerFunctionalRole} instance with policy Name, policy Version and policy Scope
649      * extracted from given functionalRolesEntry
650      *
651      * @param functionalRolesEntry Functional Role Entry
652      *
653      * @return new instance of MetricsPerFunctionalRole
654      */
655     public static MetricsPerFunctionalRole createNewMetricsPerFunctionalRole(
656             final Map.Entry<String, Map<String, String>> functionalRolesEntry) {
657         // determine functional Role
658         final String functionalRole = functionalRolesEntry.getKey();
659         // determine functional Role thresholds
660         final Map<String, String> metricsPerFunctionalRoleThresholdsMap = functionalRolesEntry.getValue();
661         final MetricsPerFunctionalRole metricsPerFunctionalRole = new MetricsPerFunctionalRole();
662         final List<Threshold> thresholds = new LinkedList<>();
663         metricsPerFunctionalRole.setThresholds(thresholds);
664         metricsPerFunctionalRole.setFunctionalRole(functionalRole);
665         // bind policyName, policyVersion and policyScope
666         metricsPerFunctionalRole.setPolicyName(metricsPerFunctionalRoleThresholdsMap.get("policyName"));
667         metricsPerFunctionalRole.setPolicyVersion(metricsPerFunctionalRoleThresholdsMap.get("policyVersion"));
668         metricsPerFunctionalRole.setPolicyScope(metricsPerFunctionalRoleThresholdsMap.get("policyScope"));
669         return metricsPerFunctionalRole;
670     }
671
672     /**
673      * Converts a flattened key/value map which has keys delimited by a given delimiter.
674      * The start Index and end index extract the sub-key value and returns a new map containing
675      * sub-keys and values.
676      *
677      * @param actualMap actual Map
678      * @param startIndex start index
679      * @param endIndex end index
680      * @param delimiter delimiter
681      *
682      * @return Map with new sub tree map
683      */
684     public static Map<String, Map<String, String>> extractSubTree(
685             final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {
686
687         final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();
688
689         // iterate over actual map entries
690         for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {
691             final String actualMapKey = actualMapEntry.getKey();
692             final String actualMapValue = actualMapEntry.getValue();
693
694             // determine delimiter start and end index
695             final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);
696             final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);
697             final int keyLength = actualMapKey.length();
698
699             // extract sub-tree map
700             if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {
701                 final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);
702                 final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);
703                 final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);
704                 if (existingThresholdMap == null) {
705                     Map<String, String> newThresholdMap = new LinkedHashMap<>();
706                     newThresholdMap.put(subMapKey, actualMapValue);
707                     subTreeMap.put(thresholdKey, newThresholdMap);
708                 } else {
709                     existingThresholdMap.put(subMapKey, actualMapValue);
710                 }
711
712             }
713         }
714
715         return subTreeMap;
716
717     }
718
719
720     /**
721      * Provides a view of underlying map that filters out entries with keys starting with give prefix
722      *
723      * @param actualMap Target map that needs to be filtered
724      * @param keyNamePrefix key prefix
725      *
726      * @return a view of actual map which only show entries which have give prefix
727      */
728     public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,
729                                                                final String keyNamePrefix) {
730         return Maps.filterKeys(actualMap,
731                 new Predicate<String>() {
732                     @Override
733                     public boolean apply(@Nullable String key) {
734                         return key != null && key.startsWith(keyNamePrefix);
735                     }
736                 });
737     }
738
739
740     /**
741      * Creates Quartz Scheduler
742      *
743      * @param pollingIntervalMS polling interval
744      * @param stdSchedulerFactory Quartz standard schedule factory instance
745      * @param quartzPublisherPropertiesFileName quartz properties file name
746      * @param jobDataMap job Data map
747      * @param quartzJobClass Quartz Job Class
748      * @param quartzJobName Quartz Job Name
749      * @param quartzTriggerName Quartz Trigger name
750      *
751      * @param <T> An implementation of Quartz {@link Job} interface
752      * @return Configured Quartz Scheduler
753      *
754      * @throws SchedulerException expection if unable to create to Quartz Scheduler
755      */
756     public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,
757             final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,
758             final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,
759             final String quartzTriggerName) throws SchedulerException {
760
761         // Initialize a new Quartz Standard scheduler
762         LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",
763                 quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);
764         final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(
765                 quartzPublisherPropertiesFileName, new Properties());
766         stdSchedulerFactory.initialize(quartzProperties);
767         final Scheduler scheduler = stdSchedulerFactory.getScheduler();
768
769         // Create a new job detail
770         final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,
771                 AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();
772
773         // Create a new scheduling builder
774         final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
775                 .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule
776                 .repeatForever(); // repeats while worker is running
777
778         // Create a trigger for the TCA Publisher Job
779         final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()
780                 .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)
781                 .startNow() // job starts right away
782                 .withSchedule(simpleScheduleBuilder).build();
783
784         scheduler.scheduleJob(jobDetail, simpleTrigger);
785         LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());
786         return scheduler;
787     }
788
789
790
791 }