dd37aa2f233047db31f196739b689119f132bc88
[dcaegen2/analytics/tca.git] / dcae-analytics-tca / src / main / java / org / openecomp / dcae / apod / analytics / tca / utils / TCAUtils.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.tca.utils;\r
22 \r
23 import com.fasterxml.jackson.core.JsonProcessingException;\r
24 import com.fasterxml.jackson.databind.JsonNode;\r
25 import com.google.common.base.Function;\r
26 import com.google.common.base.Optional;\r
27 import com.google.common.base.Predicate;\r
28 import com.google.common.base.Predicates;\r
29 import com.google.common.base.Supplier;\r
30 import com.google.common.base.Suppliers;\r
31 import com.google.common.collect.HashBasedTable;\r
32 import com.google.common.collect.ImmutableList;\r
33 import com.google.common.collect.ImmutableMap;\r
34 import com.google.common.collect.Iterables;\r
35 import com.google.common.collect.Lists;\r
36 import com.google.common.collect.Maps;\r
37 import com.google.common.collect.Table;\r
38 import com.jayway.jsonpath.DocumentContext;\r
39 import com.jayway.jsonpath.JsonPath;\r
40 import com.jayway.jsonpath.TypeRef;\r
41 import org.apache.commons.lang3.StringUtils;\r
42 import org.apache.commons.lang3.tuple.ImmutablePair;\r
43 import org.apache.commons.lang3.tuple.Pair;\r
44 import org.openecomp.dcae.apod.analytics.aai.service.AAIEnrichmentClient;\r
45 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
46 import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;\r
47 import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;\r
48 import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;\r
49 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction;\r
50 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType;\r
51 import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader;\r
52 import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality;\r
53 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;\r
54 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;\r
55 import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter;\r
56 import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;\r
57 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;\r
58 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;\r
59 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;\r
60 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;\r
61 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;\r
62 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;\r
63 import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI;\r
64 import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;\r
65 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;\r
66 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;\r
67 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;\r
68 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;\r
69 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;\r
70 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;\r
71 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;\r
72 import org.quartz.Job;\r
73 import org.quartz.JobBuilder;\r
74 import org.quartz.JobDataMap;\r
75 import org.quartz.JobDetail;\r
76 import org.quartz.Scheduler;\r
77 import org.quartz.SchedulerException;\r
78 import org.quartz.SimpleScheduleBuilder;\r
79 import org.quartz.SimpleTrigger;\r
80 import org.quartz.TriggerBuilder;\r
81 import org.quartz.impl.StdSchedulerFactory;\r
82 import org.slf4j.Logger;\r
83 import org.slf4j.LoggerFactory;\r
84 \r
85 import java.io.IOException;\r
86 import java.util.ArrayList;\r
87 import java.util.Collections;\r
88 import java.util.Comparator;\r
89 import java.util.Date;\r
90 import java.util.HashMap;\r
91 import java.util.Iterator;\r
92 import java.util.LinkedHashMap;\r
93 import java.util.LinkedList;\r
94 import java.util.List;\r
95 import java.util.Map;\r
96 import java.util.Properties;\r
97 import java.util.Set;\r
98 import java.util.SortedMap;\r
99 import java.util.TreeMap;\r
100 import java.util.UUID;\r
101 \r
102 import javax.annotation.Nonnull;\r
103 import javax.annotation.Nullable;\r
104 \r
105 import static com.google.common.collect.Lists.newArrayList;\r
106 import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;\r
107 \r
108 /**\r
109  * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get\r
110  * pre configured Json Object Mapper understand serialization and deserialization of CEF Message\r
111  * and TCA Policy\r
112  *\r
113  * @author Rajiv Singla . Creation Date: 10/24/2016.\r
114  */\r
115 public abstract class TCAUtils extends AnalyticsModelJsonUtils {\r
116 \r
117     private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);\r
118 \r
119     /**\r
120      * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,\r
121      * WARNING )\r
122      */\r
123     private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {\r
124         @Override\r
125         public int compare(Threshold threshold1, Threshold threshold2) {\r
126             return threshold1.getSeverity().compareTo(threshold2.getSeverity());\r
127         }\r
128     };\r
129 \r
130     /**\r
131      * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy}\r
132      *\r
133      * @return TCA Policy Metrics Per Event Name list\r
134      */\r
135     public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() {\r
136         return new Function<TCAPolicy, List<MetricsPerEventName>>() {\r
137             @Nullable\r
138             @Override\r
139             public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) {\r
140                 return tcaPolicy.getMetricsPerEventName();\r
141             }\r
142         };\r
143     }\r
144 \r
145     /**\r
146      * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from\r
147      * {@link MetricsPerEventName}\r
148      *\r
149      * @return Event Names or a Metrics Per Event Name object\r
150      */\r
151     public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() {\r
152         return new Function<MetricsPerEventName, String>() {\r
153             @Override\r
154             public String apply(@Nonnull MetricsPerEventName metricsPerEventName) {\r
155                 return metricsPerEventName.getEventName();\r
156             }\r
157         };\r
158     }\r
159 \r
160 \r
161     /**\r
162      * Extracts {@link TCAPolicy} Event Names\r
163      *\r
164      * @param tcaPolicy TCA Policy\r
165      * @return List of event names in the TCA Policy\r
166      */\r
167     public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) {\r
168         final List<MetricsPerEventName> metricsPerEventNames =\r
169                 tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);\r
170 \r
171         return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction());\r
172     }\r
173 \r
174     /**\r
175      * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to\r
176      * change during runtime\r
177      *\r
178      * @param tcaPolicy TCA Policy\r
179      * @return a Supplier that memoize the TCA Policy event names\r
180      */\r
181     public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) {\r
182         return Suppliers.memoize(new Supplier<List<String>>() {\r
183             @Override\r
184             public List<String> get() {\r
185                 return getPolicyEventNames(tcaPolicy);\r
186             }\r
187         });\r
188     }\r
189 \r
190 \r
191     /**\r
192      * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path\r
193      *\r
194      * @param tcaPolicy TCA Policy\r
195      * @return A table with Keys of event name and field path containing List of threshold as values\r
196      */\r
197     public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) {\r
198         final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();\r
199         for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) {\r
200             final String eventName = metricsPerEventName.getEventName();\r
201             final List<Threshold> thresholds = metricsPerEventName.getThresholds();\r
202             for (Threshold threshold : thresholds) {\r
203                 final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath());\r
204                 if (existingThresholds == null) {\r
205                     final LinkedList<Threshold> newThresholdList = new LinkedList<>();\r
206                     newThresholdList.add(threshold);\r
207                     domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList);\r
208                 } else {\r
209                     domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold);\r
210                 }\r
211             }\r
212         }\r
213         return domainFRTable;\r
214     }\r
215 \r
216 \r
217     /**\r
218      * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table\r
219      *\r
220      * @param tcaPolicy TCA Policy\r
221      * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values\r
222      */\r
223     public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier\r
224     (final TCAPolicy tcaPolicy) {\r
225         return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {\r
226             @Override\r
227             public Table<String, String, List<Threshold>> get() {\r
228                 return getPolicyEventNameThresholdsTable(tcaPolicy);\r
229             }\r
230         });\r
231     }\r
232 \r
233 \r
234     /**\r
235      * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},\r
236      * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to\r
237      * filter out messages which does not match policy domain or event Name\r
238      *\r
239      * @param cefMessage CEF Message\r
240      * @param tcaPolicy TCA Policy\r
241      * @return Message Process Context after processing filter chain\r
242      */\r
243     public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,\r
244                                                           @Nonnull final TCAPolicy tcaPolicy) {\r
245 \r
246         final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();\r
247         final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();\r
248         final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter();\r
249         // Create a list of message processors\r
250         final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =\r
251                 ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter);\r
252         final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);\r
253         // Create a message processors chain\r
254         final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =\r
255                 new GenericMessageChainProcessor<>(messageProcessors, processorContext);\r
256         // process chain\r
257         return tcaProcessingChain.processChain();\r
258     }\r
259 \r
260 \r
261     /**\r
262      * Extracts json path values for given json Field Paths from using Json path notation. Assumes\r
263      * that values extracted are always long\r
264      *\r
265      * @param message CEF Message\r
266      * @param jsonFieldPaths Json Field Paths\r
267      * @return Map containing key as json path and values as values associated with that json path\r
268      */\r
269     public static Map<String, List<Long>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>\r
270             jsonFieldPaths) {\r
271 \r
272         final Map<String, List<Long>> jsonFieldPathMap = new HashMap<>();\r
273         final DocumentContext documentContext = JsonPath.parse(message);\r
274 \r
275         for (String jsonFieldPath : jsonFieldPaths) {\r
276             final List<Long> jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<Long>>() {\r
277             });\r
278             // If Json Field Values are not or empty\r
279             if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {\r
280                 // Filter out all null values in the filed values list\r
281                 final List<Long> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,\r
282                         Predicates.<Long>notNull()));\r
283                 // If there are non null values put them in the map\r
284                 if (!nonNullValues.isEmpty()) {\r
285                     jsonFieldPathMap.put(jsonFieldPath, nonNullValues);\r
286                 }\r
287             }\r
288         }\r
289 \r
290         return jsonFieldPathMap;\r
291     }\r
292 \r
293     /**\r
294      * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path\r
295      * it applies threshold in order of their severity and record the first threshold per message field path\r
296      *\r
297      * @param messageFieldValues Field Path Values extracted from CEF Message\r
298      * @param fieldThresholds Policy Thresholds for Field Path\r
299      * @return Optional of violated threshold for a field path\r
300      */\r
301     public static Optional<Threshold> thresholdCalculator(final List<Long> messageFieldValues, final List<Threshold>\r
302             fieldThresholds) {\r
303         // order thresholds by severity\r
304         Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);\r
305         // Now apply each threshold to field values\r
306         for (Threshold fieldThreshold : fieldThresholds) {\r
307             for (Long messageFieldValue : messageFieldValues) {\r
308                 final Boolean isThresholdViolated =\r
309                         fieldThreshold.getDirection().operate(messageFieldValue, fieldThreshold.getThresholdValue());\r
310                 if (isThresholdViolated) {\r
311                     final Threshold violatedThreshold = Threshold.copy(fieldThreshold);\r
312                     violatedThreshold.setActualFieldValue(messageFieldValue);\r
313                     return Optional.of(violatedThreshold);\r
314                 }\r
315             }\r
316         }\r
317         return Optional.absent();\r
318     }\r
319 \r
320     /**\r
321      * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.\r
322      * Grabs first highest priority violated threshold\r
323      *\r
324      * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds\r
325      * @return First Highest priority violated threshold\r
326      */\r
327     public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {\r
328 \r
329         final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());\r
330 \r
331         if (violatedThresholds.size() == 1) {\r
332             return violatedThresholds.get(0);\r
333         }\r
334         Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);\r
335         // Just grab the first violated threshold with highest priority\r
336         return violatedThresholds.get(0);\r
337     }\r
338 \r
339 \r
340     /**\r
341      * Creates {@link MetricsPerEventName} object which contains violated thresholds\r
342      *\r
343      * @param tcaPolicy TCA Policy\r
344      * @param violatedThreshold Violated thresholds\r
345      * @param eventName Event Name\r
346      *\r
347      * @return MetricsPerEventName object containing one highest severity violated threshold\r
348      */\r
349     public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,\r
350                                                             @Nonnull final Threshold violatedThreshold,\r
351                                                             @Nonnull final String eventName) {\r
352 \r
353         final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList(\r
354                 Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() {\r
355                     @Override\r
356                     public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) {\r
357                         return metricsPerEventName.getEventName().equals(eventName);\r
358                     }\r
359                 }));\r
360         // TCA policy must have only one metrics per event Name\r
361         if (metricsPerEventNames.size() == 1) {\r
362             final MetricsPerEventName violatedMetrics =\r
363                     MetricsPerEventName.copy(metricsPerEventNames.get(0));\r
364             violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));\r
365             return violatedMetrics;\r
366         } else {\r
367             final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName);\r
368             throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));\r
369         }\r
370     }\r
371 \r
372     /**\r
373      * Computes threshold violations\r
374      *\r
375      * @param processorContext Filtered processor Context\r
376      * @return processor context with any threshold violations\r
377      */\r
378     public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {\r
379         final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();\r
380         return policyThresholdsProcessor.apply(processorContext);\r
381     }\r
382 \r
383 \r
384     /**\r
385      * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}\r
386      * formats\r
387      *\r
388      * @param processorContextWithViolations processor context which has TCA violations\r
389      * @param tcaAppName tca app name\r
390      * @param isAlertInCEFFormat determines if output alert is in CEF format\r
391      *\r
392      * @return TCA Alert String\r
393      *\r
394      * @throws JsonProcessingException If alert cannot be parsed into JSON String\r
395      */\r
396     public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,\r
397                                               final String tcaAppName,\r
398                                               final Boolean isAlertInCEFFormat) throws JsonProcessingException {\r
399         if (isAlertInCEFFormat != null && isAlertInCEFFormat) {\r
400             final EventListener eventListenerWithViolations =\r
401                     addThresholdViolationFields(processorContextWithViolations);\r
402             final String alertString = writeValueAsString(eventListenerWithViolations);\r
403             LOG.debug("Created alert in CEF Format: {}", alertString);\r
404             return alertString;\r
405         } else {\r
406             final TCAVESResponse newTCAVESResponse =\r
407                     createNewTCAVESResponse(processorContextWithViolations, tcaAppName);\r
408             final String alertString = writeValueAsString(newTCAVESResponse);\r
409             LOG.debug("Created alert in Non CEF Format: {}", alertString);\r
410             return alertString;\r
411         }\r
412     }\r
413 \r
414     /**\r
415      * Adds threshold violation fields to {@link EventListener}\r
416      *\r
417      * @param processorContextWithViolations processor context that contains violations\r
418      * @return event listener with threshold crossing alert fields populated\r
419      */\r
420     public static EventListener addThresholdViolationFields(\r
421             final TCACEFProcessorContext processorContextWithViolations) {\r
422 \r
423         final MetricsPerEventName metricsPerEventName =\r
424                 processorContextWithViolations.getMetricsPerEventName();\r
425         // confirm violations are indeed present\r
426         if (metricsPerEventName == null) {\r
427             final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";\r
428             throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
429         }\r
430 \r
431         // get violated threshold\r
432         final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);\r
433         final EventListener eventListener = processorContextWithViolations.getCEFEventListener();\r
434         final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();\r
435 \r
436         // create new threshold crossing alert fields\r
437         final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();\r
438         thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());\r
439         thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());\r
440         thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));\r
441         thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);\r
442         thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);\r
443         thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());\r
444         thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());\r
445         thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName());\r
446 \r
447         // create new performance count\r
448         final PerformanceCounter performanceCounter = new PerformanceCounter();\r
449         performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));\r
450         performanceCounter.setName(violatedThreshold.getFieldPath());\r
451         performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());\r
452         performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());\r
453 \r
454         // set additional parameters for threshold crossing alert fields\r
455         thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));\r
456 \r
457         // add threshold crossing fields to existing event listener\r
458         eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);\r
459 \r
460         return eventListener;\r
461     }\r
462 \r
463     /**\r
464      * Converts {@link EventSeverity} to {@link Criticality}\r
465      *\r
466      * @param eventSeverity event severity\r
467      *\r
468      * @return performance counter criticality\r
469      */\r
470     private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {\r
471         switch (eventSeverity) {\r
472             case CRITICAL:\r
473                 return Criticality.CRIT;\r
474             case MAJOR:\r
475                 return Criticality.MAJ;\r
476             default:\r
477                 return Criticality.UNKNOWN;\r
478         }\r
479     }\r
480 \r
481     /**\r
482      * Creates {@link TCAVESResponse} object\r
483      *\r
484      * @param processorContext processor Context with violations\r
485      * @param tcaAppName TCA App Name\r
486      *\r
487      * @return TCA VES Response Message\r
488      */\r
489     public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,\r
490                                                          final String tcaAppName) {\r
491 \r
492         final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();\r
493         // confirm violations are indeed present\r
494         if (metricsPerEventName == null) {\r
495             final String errorMessage = "No violations metrics. Unable to create VES Response";\r
496             throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));\r
497         }\r
498 \r
499         final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);\r
500         final EventListener eventListener = processorContext.getCEFEventListener();\r
501         final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();\r
502 \r
503         final TCAVESResponse tcavesResponse = new TCAVESResponse();\r
504         // ClosedLoopControlName included in the DCAE configuration Policy\r
505         tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());\r
506         // version included in the DCAE configuration Policy\r
507         tcavesResponse.setVersion(violatedThreshold.getVersion());\r
508         // Generate a UUID for this output message\r
509         tcavesResponse.setRequestID(UUID.randomUUID().toString());\r
510         // commonEventHeader.startEpochMicrosec from the received VES message\r
511         tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());\r
512         // commonEventHeader.lastEpochMicrosec from the received VES message for abated alerts\r
513         if (violatedThreshold.getClosedLoopEventStatus() == ClosedLoopEventStatus.ABATED) {\r
514             tcavesResponse.setClosedLoopAlarmEnd(commonEventHeader.getLastEpochMicrosec());\r
515         }\r
516         // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot\r
517         tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);\r
518 \r
519         final AAI aai = new AAI();\r
520         tcavesResponse.setAai(aai);\r
521 \r
522         // VM specific settings\r
523         if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) {\r
524             // Hard Coded - "VM"\r
525             tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE);\r
526             // Hard Coded - "vserver.vserver-name"\r
527             tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET);\r
528             // commonEventHeader.sourceName from the received VES message\r
529             aai.setGenericServerId(commonEventHeader.getSourceName());\r
530         } else {\r
531             // VNF specific settings\r
532             // Hard Coded - "VNF"\r
533             tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE);\r
534             // Hard Coded - "generic-vnf.vnf-id"\r
535             tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET);\r
536             // commonEventHeader.sourceName from the received VES message\r
537             aai.setGenericVNFId(commonEventHeader.getSourceName());\r
538         }\r
539 \r
540         // Hard Coded - "DCAE"\r
541         tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);\r
542         // policyScope included in the DCAE configuration Policy\r
543         tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope());\r
544         // policyName included in the DCAE configuration Policy\r
545         tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName());\r
546         // policyVersion included in the DCAE configuration Policy\r
547         tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion());\r
548         // Extracted from violated threshold\r
549         tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name());\r
550 \r
551         return tcavesResponse;\r
552     }\r
553 \r
554 \r
555     /**\r
556      * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert\r
557      *\r
558      * @param tcavesResponse alert\r
559      *\r
560      * @return control Loop Schema Type\r
561      */\r
562     public static ControlLoopSchemaType determineControlLoopSchemaType(final TCAVESResponse tcavesResponse) {\r
563         final AAI aai = tcavesResponse.getAai();\r
564         if (aai.getGenericServerId() != null) {\r
565             return ControlLoopSchemaType.VM;\r
566         } else {\r
567             return ControlLoopSchemaType.VNF;\r
568         }\r
569     }\r
570 \r
571     /**\r
572      * Determines {@link ControlLoopSchemaType} for given {@link TCAVESResponse} alert\r
573      *\r
574      * @param tcavesResponse {@link TCAVESResponse} TCA alert\r
575      *\r
576      * @return Source name\r
577      */\r
578     public static String determineSourceName(final TCAVESResponse tcavesResponse) {\r
579         final AAI aai = tcavesResponse.getAai();\r
580         if (aai.getGenericServerId() != null) {\r
581             return aai.getGenericServerId();\r
582         } else {\r
583             return aai.getGenericVNFId();\r
584         }\r
585     }\r
586 \r
587 \r
588     /**\r
589      * Extract Domain and Event Name from processor context if present\r
590      *\r
591      * @param processorContext processor context\r
592      * @return Tuple of domain and event Name\r
593      */\r
594     public static Pair<String, String> getDomainAndEventName(\r
595             @Nullable final TCACEFProcessorContext processorContext) {\r
596 \r
597         String domain = null;\r
598         String eventName = null;\r
599 \r
600         if (processorContext != null &&\r
601                 processorContext.getCEFEventListener() != null &&\r
602                 processorContext.getCEFEventListener().getEvent() != null &&\r
603                 processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {\r
604             final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()\r
605                     .getCommonEventHeader();\r
606 \r
607             if (commonEventHeader.getDomain() != null) {\r
608                 domain = commonEventHeader.getDomain().name();\r
609             }\r
610 \r
611             if (commonEventHeader.getEventName() != null) {\r
612                 eventName = commonEventHeader.getEventName();\r
613             }\r
614 \r
615         }\r
616 \r
617         return new ImmutablePair<>(domain, eventName);\r
618 \r
619     }\r
620 \r
621     /**\r
622      * Creates {@link TCAPolicy} Metrics per Event Name list\r
623      *\r
624      * @param eventNamesMap Map containing event Name as key and corresponding values\r
625      *\r
626      * @return List of {@link MetricsPerEventName}\r
627      */\r
628     public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList(\r
629             final Map<String, Map<String, String>> eventNamesMap) {\r
630 \r
631         // create a new metrics per event Name list\r
632         final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>();\r
633 \r
634         for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) {\r
635 \r
636             // create new metrics per event Name instance\r
637             final MetricsPerEventName newMetricsPerEventName =\r
638                     createNewMetricsPerEventName(eventNamesEntry);\r
639             metricsPerEventNames.add(newMetricsPerEventName);\r
640 \r
641             // determine all threshold related values\r
642             final Map<String, String> thresholdsValuesMaps =\r
643                     filterMapByKeyNamePrefix(eventNamesEntry.getValue(),\r
644                             AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);\r
645 \r
646             // create a map of all threshold values\r
647             final Map<String, Map<String, String>> thresholdsMap =\r
648                     extractSubTree(thresholdsValuesMaps, 1, 2,\r
649                             AnalyticsConstants.TCA_POLICY_DELIMITER);\r
650 \r
651             // add thresholds to nmetrics per event Names threshold list\r
652             for (Map<String, String> thresholdMap : thresholdsMap.values()) {\r
653                 newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap));\r
654             }\r
655 \r
656         }\r
657 \r
658         return metricsPerEventNames;\r
659     }\r
660 \r
661     /**\r
662      * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap\r
663      *\r
664      * @param thresholdMap threshold map with threshold values\r
665      *\r
666      * @return new instance of TCA Policy Threshold\r
667      */\r
668     public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {\r
669         final Threshold threshold = new Threshold();\r
670         threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));\r
671         threshold.setVersion(thresholdMap.get("policy.version"));\r
672         threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));\r
673         threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));\r
674         threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));\r
675         threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));\r
676         threshold.setClosedLoopEventStatus(\r
677                 ClosedLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus")));\r
678         return threshold;\r
679     }\r
680 \r
681     /**\r
682      * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope\r
683      * extracted from given eventNamesEntry\r
684      *\r
685      * @param eventNamesEntry Event Names Entry\r
686      *\r
687      * @return new instance of MetricsPerEventName\r
688      */\r
689     public static MetricsPerEventName createNewMetricsPerEventName(\r
690             final Map.Entry<String, Map<String, String>> eventNamesEntry) {\r
691         // determine event Name\r
692         final String eventName = eventNamesEntry.getKey();\r
693         // determine event Name thresholds\r
694         final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue();\r
695         final MetricsPerEventName metricsPerEventName = new MetricsPerEventName();\r
696         final List<Threshold> thresholds = new LinkedList<>();\r
697         metricsPerEventName.setThresholds(thresholds);\r
698         metricsPerEventName.setEventName(eventName);\r
699         // bind policyName, policyVersion, policyScope and closedLoopControlName\r
700         metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName"));\r
701         metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion"));\r
702         metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope"));\r
703         metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf(\r
704                 metricsPerEventNameThresholdsMap.get("controlLoopSchemaType")));\r
705         return metricsPerEventName;\r
706     }\r
707 \r
708     /**\r
709      * Converts a flattened key/value map which has keys delimited by a given delimiter.\r
710      * The start Index and end index extract the sub-key value and returns a new map containing\r
711      * sub-keys and values.\r
712      *\r
713      * @param actualMap actual Map\r
714      * @param startIndex start index\r
715      * @param endIndex end index\r
716      * @param delimiter delimiter\r
717      *\r
718      * @return Map with new sub tree map\r
719      */\r
720     public static Map<String, Map<String, String>> extractSubTree(\r
721             final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {\r
722 \r
723         final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();\r
724 \r
725         // iterate over actual map entries\r
726         for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {\r
727             final String actualMapKey = actualMapEntry.getKey();\r
728             final String actualMapValue = actualMapEntry.getValue();\r
729 \r
730             // determine delimiter start and end index\r
731             final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);\r
732             final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);\r
733             final int keyLength = actualMapKey.length();\r
734 \r
735             // extract sub-tree map\r
736             if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {\r
737                 final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);\r
738                 final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);\r
739                 final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);\r
740                 if (existingThresholdMap == null) {\r
741                     Map<String, String> newThresholdMap = new LinkedHashMap<>();\r
742                     newThresholdMap.put(subMapKey, actualMapValue);\r
743                     subTreeMap.put(thresholdKey, newThresholdMap);\r
744                 } else {\r
745                     existingThresholdMap.put(subMapKey, actualMapValue);\r
746                 }\r
747 \r
748             }\r
749         }\r
750 \r
751         return subTreeMap;\r
752 \r
753     }\r
754 \r
755 \r
756     /**\r
757      * Provides a view of underlying map that filters out entries with keys starting with give prefix\r
758      *\r
759      * @param actualMap Target map that needs to be filtered\r
760      * @param keyNamePrefix key prefix\r
761      *\r
762      * @return a view of actual map which only show entries which have give prefix\r
763      */\r
764     public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,\r
765                                                                final String keyNamePrefix) {\r
766         return Maps.filterKeys(actualMap,\r
767                 new Predicate<String>() {\r
768                     @Override\r
769                     public boolean apply(@Nullable String key) {\r
770                         return key != null && key.startsWith(keyNamePrefix);\r
771                     }\r
772                 });\r
773     }\r
774 \r
775 \r
776     /**\r
777      * Creates Quartz Scheduler\r
778      *\r
779      * @param pollingIntervalMS polling interval\r
780      * @param stdSchedulerFactory Quartz standard schedule factory instance\r
781      * @param quartzPublisherPropertiesFileName quartz properties file name\r
782      * @param jobDataMap job Data map\r
783      * @param quartzJobClass Quartz Job Class\r
784      * @param quartzJobName Quartz Job Name\r
785      * @param quartzTriggerName Quartz Trigger name\r
786      *\r
787      * @param <T> An implementation of Quartz {@link Job} interface\r
788      * @return Configured Quartz Scheduler\r
789      *\r
790      * @throws SchedulerException exception if unable to create to Quartz Scheduler\r
791      */\r
792     public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,\r
793             final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,\r
794             final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,\r
795             final String quartzTriggerName) throws SchedulerException {\r
796 \r
797         // Initialize a new Quartz Standard scheduler\r
798         LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",\r
799                 quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);\r
800         final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(\r
801                 quartzPublisherPropertiesFileName, new Properties());\r
802         stdSchedulerFactory.initialize(quartzProperties);\r
803         final Scheduler scheduler = stdSchedulerFactory.getScheduler();\r
804 \r
805         // Create a new job detail\r
806         final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,\r
807                 AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();\r
808 \r
809         // Create a new scheduling builder\r
810         final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()\r
811                 .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule\r
812                 .repeatForever(); // repeats while worker is running\r
813 \r
814         // Create a trigger for the TCA Publisher Job\r
815         final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()\r
816                 .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)\r
817                 .startNow() // job starts right away\r
818                 .withSchedule(simpleScheduleBuilder).build();\r
819 \r
820         scheduler.scheduleJob(jobDetail, simpleTrigger);\r
821         LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());\r
822         return scheduler;\r
823     }\r
824 \r
825 \r
826     /**\r
827      * Does A&AI Enrichment for VM\r
828      *\r
829      * @param tcavesResponse Outgoing alert object\r
830      * @param aaiEnrichmentClient A&AI Enrichment client\r
831      * @param aaiVMEnrichmentAPIPath A&AI VM Enrichment API Path\r
832      * @param alertString alert String\r
833      * @param vmSourceName vm source name\r
834      */\r
835     public static void doAAIVMEnrichment(final TCAVESResponse tcavesResponse,\r
836                                          final AAIEnrichmentClient aaiEnrichmentClient,\r
837                                          final String aaiVMEnrichmentAPIPath,\r
838                                          final String alertString,\r
839                                          final String vmSourceName) {\r
840 \r
841         final String filterString = "vserver-name:EQUALS:" + vmSourceName;\r
842         final ImmutableMap<String, String> queryParams = ImmutableMap.of(\r
843                 "search-node-type", "vserver", "filter", filterString);\r
844 \r
845         // fetch vm object resource Link from A&AI\r
846         final String vmAAIResourceLinkDetails = aaiEnrichmentClient.getEnrichmentDetails(\r
847                 aaiVMEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());\r
848         final String vmObjectResourceLink = getVMObjectResourceLink(vmAAIResourceLinkDetails);\r
849 \r
850         if (vmObjectResourceLink == null) {\r
851             LOG.warn("No A&AI Enrichment possible for alert message: {}.VM Object resource Link cannot be " +\r
852                     "determined for vmSourceName: {}.", alertString, vmSourceName);\r
853         } else {\r
854 \r
855             LOG.debug("Fetching VM A&AI Enrichment Details for VM Source Name: {}, Object resource Link: {}",\r
856                     vmSourceName, vmObjectResourceLink);\r
857 \r
858             // fetch vm A&AI Enrichment\r
859             final String vmEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(\r
860                     vmObjectResourceLink, Collections.<String, String>emptyMap(), createAAIEnrichmentHeaders());\r
861 \r
862             // enrich AAI\r
863             enrichAAI(tcavesResponse.getAai(), vmEnrichmentDetails, alertString,\r
864                     AnalyticsConstants.AAI_VSERVER_KEY_PREFIX);\r
865         }\r
866 \r
867 \r
868     }\r
869 \r
870 \r
871     /**\r
872      * Does A&AI Enrichment for VNF\r
873      *\r
874      * @param tcavesResponse Outgoing alert object\r
875      * @param aaiEnrichmentClient A&AI Enrichment client\r
876      * @param aaiVNFEnrichmentAPIPath A&AI VNF Enrichment API Path\r
877      * @param alertString alert String\r
878      * @param vnfSourceName vnf source name\r
879      */\r
880     public static void doAAIVNFEnrichment(final TCAVESResponse tcavesResponse,\r
881                                           final AAIEnrichmentClient aaiEnrichmentClient,\r
882                                           final String aaiVNFEnrichmentAPIPath,\r
883                                           final String alertString,\r
884                                           final String vnfSourceName) {\r
885         final ImmutableMap<String, String> queryParams = ImmutableMap.of("vnf-name", vnfSourceName);\r
886 \r
887         // fetch vnf A&AI Enrichment\r
888         final String vnfEnrichmentDetails = aaiEnrichmentClient.getEnrichmentDetails(\r
889                 aaiVNFEnrichmentAPIPath, queryParams, createAAIEnrichmentHeaders());\r
890 \r
891         // enrich alert AAI\r
892         enrichAAI(tcavesResponse.getAai(), vnfEnrichmentDetails, alertString, AnalyticsConstants.AAI_VNF_KEY_PREFIX);\r
893     }\r
894 \r
895     /**\r
896      * Fetches VM Object Resource Link from A&AI Resource Link Json\r
897      *\r
898      * @param vmAAIResourceLinkDetails VM Object Resource Link from A&AI Resource Link Json\r
899      *\r
900      * @return object resource link String\r
901      */\r
902     private static String getVMObjectResourceLink(final String vmAAIResourceLinkDetails) {\r
903         if (StringUtils.isNotBlank(vmAAIResourceLinkDetails)) {\r
904             try {\r
905                 final JsonNode jsonNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(vmAAIResourceLinkDetails);\r
906                 final JsonNode resourceLinkJsonNode = jsonNode.findPath("resource-link");\r
907                 if (!resourceLinkJsonNode.isMissingNode()) {\r
908                     return resourceLinkJsonNode.asText();\r
909                 }\r
910             } catch (IOException e) {\r
911                 LOG.warn("Unable to determine VM Object link inside AAI Resource Link Response JSON: {}. Exception: {}",\r
912                         vmAAIResourceLinkDetails, e);\r
913             }\r
914         }\r
915         return null;\r
916     }\r
917 \r
918     /**\r
919      * Creates Http Headers for A&AI Enrichment client\r
920      *\r
921      * @return Http Headers Map for A&AI Enrichment client\r
922      */\r
923     private static Map<String, String> createAAIEnrichmentHeaders() {\r
924         final Map<String, String> aaiEnrichmentHeaders = new LinkedHashMap<>();\r
925         final String transactionId = Long.toString(new Date().getTime());\r
926         aaiEnrichmentHeaders.put("X-FromAppId", "dcae-analytics-tca");\r
927         aaiEnrichmentHeaders.put("X-TransactionId", transactionId);\r
928         aaiEnrichmentHeaders.put("Accept", "application/json");\r
929         aaiEnrichmentHeaders.put("Real-Time", "true");\r
930         aaiEnrichmentHeaders.put("Content-Type", "application/json");\r
931         return aaiEnrichmentHeaders;\r
932     }\r
933 \r
934 \r
935     /**\r
936      * Populates A&AI details retrieved from A&AI Enrichment API into Alerts A&AI Object\r
937      *\r
938      * @param preEnrichmentAAI A&AI Alert object which needs to be populated with A&AI Enrichment Details\r
939      * @param aaiEnrichmentDetails A&AI Enrichment API fetched JSON String\r
940      * @param alertString Alert String\r
941      * @param keyPrefix Key prefix that needs to be added to each fetched A&AI Enrichment record\r
942      */\r
943     private static void enrichAAI(final AAI preEnrichmentAAI, final String aaiEnrichmentDetails,\r
944                                   final String alertString, final String keyPrefix) {\r
945 \r
946         if (aaiEnrichmentDetails == null) {\r
947             LOG.warn("No A&AI Enrichment possible for AAI: {}. A&AI Enrichment details are absent." +\r
948                     "Skipping Enrichment for alert message:{}", preEnrichmentAAI, alertString);\r
949 \r
950         } else {\r
951 \r
952             final AAI enrichmentDetailsAAI = getEnrichmentDetailsAAI(aaiEnrichmentDetails);\r
953 \r
954             if (enrichmentDetailsAAI != null) {\r
955                 final Set<Map.Entry<String, Object>> enrichedAAIEntrySet =\r
956                         enrichmentDetailsAAI.getDynamicProperties().entrySet();\r
957                 final Map<String, Object> preEnrichmentAAIDynamicProperties = preEnrichmentAAI.getDynamicProperties();\r
958 \r
959                 // populate A&AI Enrichment details and add prefix to key\r
960                 for (Map.Entry<String, Object> enrichedAAIEntry : enrichedAAIEntrySet) {\r
961                     preEnrichmentAAIDynamicProperties.put(keyPrefix + enrichedAAIEntry.getKey(),\r
962                             enrichedAAIEntry.getValue());\r
963                 }\r
964 \r
965                 LOG.debug("A&AI Enrichment was completed successfully for alert message: {}. Enriched AAI: {}",\r
966                         alertString, preEnrichmentAAI);\r
967             } else {\r
968                 LOG.warn("No A&AI Enrichment possible for AAI: {}. Invalid A&AI Response: {}." +\r
969                                 "Skipping Enrichment for alert message: {}",\r
970                         preEnrichmentAAI, aaiEnrichmentDetails, alertString);\r
971             }\r
972         }\r
973 \r
974     }\r
975 \r
976     /**\r
977      * Creates a new A&AI object with only top level A&AI Enrichment details\r
978      *\r
979      * @param aaiEnrichmentDetails A&AI Enrichment details\r
980      *\r
981      * @return new A&AI with only top level A&AI Enrichment details\r
982      */\r
983     private static AAI getEnrichmentDetailsAAI(final String aaiEnrichmentDetails) {\r
984         try {\r
985             final JsonNode rootNode = ANALYTICS_MODEL_OBJECT_MAPPER.readTree(aaiEnrichmentDetails);\r
986             final Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();\r
987             while (fieldsIterator.hasNext()) {\r
988                 final Map.Entry<String, JsonNode> fieldEntry = fieldsIterator.next();\r
989                 final JsonNode jsonNode = fieldEntry.getValue();\r
990                 // remove all arrays, objects from A&AI Enrichment Json\r
991                 if (jsonNode.isPojo() || jsonNode.isObject() || jsonNode.isArray()) {\r
992                     fieldsIterator.remove();\r
993                 }\r
994             }\r
995             return ANALYTICS_MODEL_OBJECT_MAPPER.treeToValue(rootNode, AAI.class);\r
996         } catch (IOException e) {\r
997             LOG.error("Failed to Parse AAI Enrichment Details from JSON: {}, Exception: {}.", aaiEnrichmentDetails, e);\r
998         }\r
999         return null;\r
1000     }\r
1001 \r
1002 }\r