2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.tca.utils;
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Predicate;
27 import com.google.common.base.Predicates;
28 import com.google.common.base.Supplier;
29 import com.google.common.base.Suppliers;
30 import com.google.common.collect.HashBasedTable;
31 import com.google.common.collect.ImmutableList;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Maps;
35 import com.google.common.collect.Table;
36 import com.jayway.jsonpath.DocumentContext;
37 import com.jayway.jsonpath.JsonPath;
38 import com.jayway.jsonpath.TypeRef;
39 import org.apache.commons.lang3.StringUtils;
40 import org.apache.commons.lang3.tuple.ImmutablePair;
41 import org.apache.commons.lang3.tuple.Pair;
42 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
43 import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
44 import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
45 import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;
46 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction;
47 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType;
48 import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader;
49 import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality;
50 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
51 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;
52 import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter;
53 import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;
54 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus;
55 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopSchemaType;
56 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;
57 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
58 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
59 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
60 import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI;
61 import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
62 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;
63 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
64 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
65 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
66 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyEventNameFilter;
67 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
68 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
69 import org.quartz.Job;
70 import org.quartz.JobBuilder;
71 import org.quartz.JobDataMap;
72 import org.quartz.JobDetail;
73 import org.quartz.Scheduler;
74 import org.quartz.SchedulerException;
75 import org.quartz.SimpleScheduleBuilder;
76 import org.quartz.SimpleTrigger;
77 import org.quartz.TriggerBuilder;
78 import org.quartz.impl.StdSchedulerFactory;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
82 import java.util.ArrayList;
83 import java.util.Collections;
84 import java.util.Comparator;
85 import java.util.Date;
86 import java.util.HashMap;
87 import java.util.LinkedHashMap;
88 import java.util.LinkedList;
89 import java.util.List;
91 import java.util.Properties;
93 import java.util.SortedMap;
94 import java.util.TreeMap;
95 import java.util.UUID;
97 import javax.annotation.Nonnull;
98 import javax.annotation.Nullable;
100 import static com.google.common.collect.Lists.newArrayList;
101 import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;
104 * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get
105 * pre configured Json Object Mapper understand serialization and deserialization of CEF Message
108 * @author Rajiv Singla . Creation Date: 10/24/2016.
110 public abstract class TCAUtils extends AnalyticsModelJsonUtils {
112 private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);
115 * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,
118 private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {
120 public int compare(Threshold threshold1, Threshold threshold2) {
121 return threshold1.getSeverity().compareTo(threshold2.getSeverity());
126 * {@link Function} that extracts {@link TCAPolicy#getMetricsPerEventName()} from {@link TCAPolicy}
128 * @return TCA Policy Metrics Per Event Name list
130 public static Function<TCAPolicy, List<MetricsPerEventName>> tcaPolicyMetricsExtractorFunction() {
131 return new Function<TCAPolicy, List<MetricsPerEventName>>() {
134 public List<MetricsPerEventName> apply(@Nonnull TCAPolicy tcaPolicy) {
135 return tcaPolicy.getMetricsPerEventName();
141 * {@link Function} that extracts {@link MetricsPerEventName#getEventName()} from
142 * {@link MetricsPerEventName}
144 * @return Event Names or a Metrics Per Event Name object
146 public static Function<MetricsPerEventName, String> tcaEventNameExtractorFunction() {
147 return new Function<MetricsPerEventName, String>() {
149 public String apply(@Nonnull MetricsPerEventName metricsPerEventName) {
150 return metricsPerEventName.getEventName();
157 * Extracts {@link TCAPolicy} Event Names
159 * @param tcaPolicy TCA Policy
160 * @return List of event names in the TCA Policy
162 public static List<String> getPolicyEventNames(@Nonnull final TCAPolicy tcaPolicy) {
163 final List<MetricsPerEventName> metricsPerEventNames =
164 tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);
166 return Lists.transform(metricsPerEventNames, tcaEventNameExtractorFunction());
170 * A {@link Supplier} which caches {@link TCAPolicy} Event names as they are not expected to
171 * change during runtime
173 * @param tcaPolicy TCA Policy
174 * @return a Supplier that memoize the TCA Policy event names
176 public static Supplier<List<String>> getPolicyEventNamesSupplier(@Nonnull final TCAPolicy tcaPolicy) {
177 return Suppliers.memoize(new Supplier<List<String>>() {
179 public List<String> get() {
180 return getPolicyEventNames(tcaPolicy);
187 * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Event Name and Threshold Field path
189 * @param tcaPolicy TCA Policy
190 * @return A table with Keys of event name and field path containing List of threshold as values
192 public static Table<String, String, List<Threshold>> getPolicyEventNameThresholdsTable(final TCAPolicy tcaPolicy) {
193 final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();
194 for (MetricsPerEventName metricsPerEventName : tcaPolicy.getMetricsPerEventName()) {
195 final String eventName = metricsPerEventName.getEventName();
196 final List<Threshold> thresholds = metricsPerEventName.getThresholds();
197 for (Threshold threshold : thresholds) {
198 final List<Threshold> existingThresholds = domainFRTable.get(eventName, threshold.getFieldPath());
199 if (existingThresholds == null) {
200 final LinkedList<Threshold> newThresholdList = new LinkedList<>();
201 newThresholdList.add(threshold);
202 domainFRTable.put(eventName, threshold.getFieldPath(), newThresholdList);
204 domainFRTable.get(eventName, threshold.getFieldPath()).add(threshold);
208 return domainFRTable;
213 * A {@link Supplier} which caches Policy Event Name and Threshold Field Path Thresholds lookup table
215 * @param tcaPolicy TCA Policy
216 * @return Cached Supplier for table with Keys of event Name and field path containing thresholds as values
218 public static Supplier<Table<String, String, List<Threshold>>> getPolicyEventNameThresholdsTableSupplier
219 (final TCAPolicy tcaPolicy) {
220 return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {
222 public Table<String, String, List<Threshold>> get() {
223 return getPolicyEventNameThresholdsTable(tcaPolicy);
230 * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},
231 * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyEventNameFilter}s to
232 * filter out messages which does not match policy domain or event Name
234 * @param cefMessage CEF Message
235 * @param tcaPolicy TCA Policy
236 * @return Message Process Context after processing filter chain
238 public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,
239 @Nonnull final TCAPolicy tcaPolicy) {
241 final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();
242 final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();
243 final TCACEFPolicyEventNameFilter eventNameFilter = new TCACEFPolicyEventNameFilter();
244 // Create a list of message processors
245 final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =
246 ImmutableList.of(jsonProcessor, domainFilter, eventNameFilter);
247 final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);
248 // Create a message processors chain
249 final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =
250 new GenericMessageChainProcessor<>(messageProcessors, processorContext);
252 return tcaProcessingChain.processChain();
257 * Extracts json path values for given json Field Paths from using Json path notation. Assumes
258 * that values extracted are always long
260 * @param message CEF Message
261 * @param jsonFieldPaths Json Field Paths
262 * @return Map containing key as json path and values as values associated with that json path
264 public static Map<String, List<Long>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>
267 final Map<String, List<Long>> jsonFieldPathMap = new HashMap<>();
268 final DocumentContext documentContext = JsonPath.parse(message);
270 for (String jsonFieldPath : jsonFieldPaths) {
271 final List<Long> jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<Long>>() {
273 // If Json Field Values are not or empty
274 if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
275 // Filter out all null values in the filed values list
276 final List<Long> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,
277 Predicates.<Long>notNull()));
278 // If there are non null values put them in the map
279 if (!nonNullValues.isEmpty()) {
280 jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
285 return jsonFieldPathMap;
289 * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
290 * it applies threshold in order of their severity and record the first threshold per message field path
292 * @param messageFieldValues Field Path Values extracted from CEF Message
293 * @param fieldThresholds Policy Thresholds for Field Path
294 * @return Optional of violated threshold for a field path
296 public static Optional<Threshold> thresholdCalculator(final List<Long> messageFieldValues, final List<Threshold>
298 // order thresholds by severity
299 Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);
300 // Now apply each threshold to field values
301 for (Threshold fieldThreshold : fieldThresholds) {
302 for (Long messageFieldValue : messageFieldValues) {
303 final Boolean isThresholdViolated =
304 fieldThreshold.getDirection().operate(messageFieldValue, fieldThreshold.getThresholdValue());
305 if (isThresholdViolated) {
306 final Threshold violatedThreshold = Threshold.copy(fieldThreshold);
307 violatedThreshold.setActualFieldValue(messageFieldValue);
308 return Optional.of(violatedThreshold);
312 return Optional.absent();
316 * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.
317 * Grabs first highest priority violated threshold
319 * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds
320 * @return First Highest priority violated threshold
322 public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {
324 final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());
326 if (violatedThresholds.size() == 1) {
327 return violatedThresholds.get(0);
329 Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);
330 // Just grab the first violated threshold with highest priority
331 return violatedThresholds.get(0);
336 * Creates {@link MetricsPerEventName} object which contains violated thresholds
338 * @param tcaPolicy TCA Policy
339 * @param violatedThreshold Violated thresholds
340 * @param eventName Event Name
342 * @return MetricsPerEventName object containing one highest severity violated threshold
344 public static MetricsPerEventName createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,
345 @Nonnull final Threshold violatedThreshold,
346 @Nonnull final String eventName) {
348 final ArrayList<MetricsPerEventName> metricsPerEventNames = newArrayList(
349 Iterables.filter(tcaPolicy.getMetricsPerEventName(), new Predicate<MetricsPerEventName>() {
351 public boolean apply(@Nonnull MetricsPerEventName metricsPerEventName) {
352 return metricsPerEventName.getEventName().equals(eventName);
355 // TCA policy must have only one metrics per event Name
356 if (metricsPerEventNames.size() == 1) {
357 final MetricsPerEventName violatedMetrics =
358 MetricsPerEventName.copy(metricsPerEventNames.get(0));
359 violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));
360 return violatedMetrics;
362 final String errorMessage = String.format("TCA Policy must contain eventName: %s", eventName);
363 throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
368 * Computes threshold violations
370 * @param processorContext Filtered processor Context
371 * @return processor context with any threshold violations
373 public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {
374 final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();
375 return policyThresholdsProcessor.apply(processorContext);
380 * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}
383 * @param processorContextWithViolations processor context which has TCA violations
384 * @param tcaAppName tca app name
385 * @param isAlertInCEFFormat determines if output alert is in CEF format
387 * @return TCA Alert String
389 * @throws JsonProcessingException If alert cannot be parsed into JSON String
391 public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,
392 final String tcaAppName,
393 final Boolean isAlertInCEFFormat) throws JsonProcessingException {
394 if (isAlertInCEFFormat != null && isAlertInCEFFormat) {
395 final EventListener eventListenerWithViolations =
396 addThresholdViolationFields(processorContextWithViolations);
397 final String alertString = writeValueAsString(eventListenerWithViolations);
398 LOG.debug("Created alert in CEF Format: {}", alertString);
401 final TCAVESResponse newTCAVESResponse =
402 createNewTCAVESResponse(processorContextWithViolations, tcaAppName);
403 final String alertString = writeValueAsString(newTCAVESResponse);
404 LOG.debug("Created alert in Non CEF Format: {}", alertString);
410 * Adds threshold violation fields to {@link EventListener}
412 * @param processorContextWithViolations processor context that contains violations
413 * @return event listener with threshold crossing alert fields populated
415 public static EventListener addThresholdViolationFields(
416 final TCACEFProcessorContext processorContextWithViolations) {
418 final MetricsPerEventName metricsPerEventName =
419 processorContextWithViolations.getMetricsPerEventName();
420 // confirm violations are indeed present
421 if (metricsPerEventName == null) {
422 final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";
423 throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
426 // get violated threshold
427 final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
428 final EventListener eventListener = processorContextWithViolations.getCEFEventListener();
429 final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
431 // create new threshold crossing alert fields
432 final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();
433 thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());
434 thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());
435 thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));
436 thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);
437 thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);
438 thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());
439 thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());
440 thresholdCrossingAlertFields.setElementType(commonEventHeader.getEventName());
442 // create new performance count
443 final PerformanceCounter performanceCounter = new PerformanceCounter();
444 performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));
445 performanceCounter.setName(violatedThreshold.getFieldPath());
446 performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());
447 performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());
449 // set additional parameters for threshold crossing alert fields
450 thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));
452 // add threshold crossing fields to existing event listener
453 eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);
455 return eventListener;
459 * Converts {@link EventSeverity} to {@link Criticality}
461 * @param eventSeverity event severity
463 * @return performance counter criticality
465 private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {
466 switch (eventSeverity) {
468 return Criticality.CRIT;
470 return Criticality.MAJ;
472 return Criticality.UNKNOWN;
477 * Creates {@link TCAVESResponse} object
479 * @param processorContext processor Context with violations
480 * @param tcaAppName TCA App Name
482 * @return TCA VES Response Message
484 public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,
485 final String tcaAppName) {
487 final MetricsPerEventName metricsPerEventName = processorContext.getMetricsPerEventName();
488 // confirm violations are indeed present
489 if (metricsPerEventName == null) {
490 final String errorMessage = "No violations metrics. Unable to create VES Response";
491 throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
494 final Threshold violatedThreshold = metricsPerEventName.getThresholds().get(0);
495 final EventListener eventListener = processorContext.getCEFEventListener();
496 final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
498 final TCAVESResponse tcavesResponse = new TCAVESResponse();
499 // ClosedLoopControlName included in the DCAE configuration Policy
500 tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());
501 // version included in the DCAE configuration Policy
502 tcavesResponse.setVersion(violatedThreshold.getVersion());
503 // Generate a UUID for this output message
504 tcavesResponse.setRequestID(UUID.randomUUID().toString());
505 // commonEventHeader.startEpochMicrosec from the received VES measurementsForVfScaling message
506 tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());
507 // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot
508 // TODO: Find out how to get this field
509 tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);
511 final AAI aai = new AAI();
512 tcavesResponse.setAai(aai);
514 // VM specific settings
515 if (metricsPerEventName.getControlLoopSchemaType() == ControlLoopSchemaType.VM) {
517 tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET_TYPE);
518 // Hard Coded - "vserver.vserver-name"
519 tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VM_TARGET);
520 aai.setGenericServerId(commonEventHeader.getReportingEntityName());
522 // VNF specific settings
523 // Hard Coded - "VNF"
524 tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET_TYPE);
525 // Hard Coded - "generic-vnf.vnf-id"
526 tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_VNF_TARGET);
527 // commonEventHeader.reportingEntityName from the received VES measurementsForVfScaling message (value for
528 // the data element used in A&AI)
529 aai.setGenericVNFId(commonEventHeader.getReportingEntityName());
532 // Hard Coded - "DCAE"
533 tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);
534 // policyScope included in the DCAE configuration Policy
535 tcavesResponse.setPolicyScope(metricsPerEventName.getPolicyScope());
536 // policyName included in the DCAE configuration Policy
537 tcavesResponse.setPolicyName(metricsPerEventName.getPolicyName());
538 // policyVersion included in the DCAE configuration Policy
539 tcavesResponse.setPolicyVersion(metricsPerEventName.getPolicyVersion());
540 // Extracted from violated threshold
541 tcavesResponse.setClosedLoopEventStatus(violatedThreshold.getClosedLoopEventStatus().name());
543 return tcavesResponse;
548 * Extract Domain and Event Name from processor context if present
550 * @param processorContext processor context
551 * @return Tuple of domain and event Name
553 public static Pair<String, String> getDomainAndEventName(
554 @Nullable final TCACEFProcessorContext processorContext) {
556 String domain = null;
557 String eventName = null;
559 if (processorContext != null &&
560 processorContext.getCEFEventListener() != null &&
561 processorContext.getCEFEventListener().getEvent() != null &&
562 processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {
563 final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()
564 .getCommonEventHeader();
566 if (commonEventHeader.getDomain() != null) {
567 domain = commonEventHeader.getDomain().name();
570 if (commonEventHeader.getEventName() != null) {
571 eventName = commonEventHeader.getEventName();
576 return new ImmutablePair<>(domain, eventName);
581 * Creates {@link TCAPolicy} Metrics per Event Name list
583 * @param eventNamesMap Map containing event Name as key and corresponding values
585 * @return List of {@link MetricsPerEventName}
587 public static List<MetricsPerEventName> createTCAPolicyMetricsPerEventNameList(
588 final Map<String, Map<String, String>> eventNamesMap) {
590 // create a new metrics per event Name list
591 final List<MetricsPerEventName> metricsPerEventNames = new LinkedList<>();
593 for (Map.Entry<String, Map<String, String>> eventNamesEntry : eventNamesMap.entrySet()) {
595 // create new metrics per event Name instance
596 final MetricsPerEventName newMetricsPerEventName =
597 createNewMetricsPerEventName(eventNamesEntry);
598 metricsPerEventNames.add(newMetricsPerEventName);
600 // determine all threshold related values
601 final Map<String, String> thresholdsValuesMaps =
602 filterMapByKeyNamePrefix(eventNamesEntry.getValue(),
603 AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);
605 // create a map of all threshold values
606 final Map<String, Map<String, String>> thresholdsMap =
607 extractSubTree(thresholdsValuesMaps, 1, 2,
608 AnalyticsConstants.TCA_POLICY_DELIMITER);
610 // add thresholds to nmetrics per event Names threshold list
611 for (Map<String, String> thresholdMap : thresholdsMap.values()) {
612 newMetricsPerEventName.getThresholds().add(createNewThreshold(thresholdMap));
617 return metricsPerEventNames;
621 * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap
623 * @param thresholdMap threshold map with threshold values
625 * @return new instance of TCA Policy Threshold
627 public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {
628 final Threshold threshold = new Threshold();
629 threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));
630 threshold.setVersion(thresholdMap.get("policy.version"));
631 threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));
632 threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));
633 threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));
634 threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));
635 threshold.setClosedLoopEventStatus(
636 ControlLoopEventStatus.valueOf(thresholdMap.get("policy.closedLoopEventStatus")));
641 * Create new {@link MetricsPerEventName} instance with policy Name, policy Version and policy Scope
642 * extracted from given eventNamesEntry
644 * @param eventNamesEntry Event Names Entry
646 * @return new instance of MetricsPerEventName
648 public static MetricsPerEventName createNewMetricsPerEventName(
649 final Map.Entry<String, Map<String, String>> eventNamesEntry) {
650 // determine event Name
651 final String eventName = eventNamesEntry.getKey();
652 // determine event Name thresholds
653 final Map<String, String> metricsPerEventNameThresholdsMap = eventNamesEntry.getValue();
654 final MetricsPerEventName metricsPerEventName = new MetricsPerEventName();
655 final List<Threshold> thresholds = new LinkedList<>();
656 metricsPerEventName.setThresholds(thresholds);
657 metricsPerEventName.setEventName(eventName);
658 // bind policyName, policyVersion, policyScope and closedLoopControlName
659 metricsPerEventName.setPolicyName(metricsPerEventNameThresholdsMap.get("policyName"));
660 metricsPerEventName.setPolicyVersion(metricsPerEventNameThresholdsMap.get("policyVersion"));
661 metricsPerEventName.setPolicyScope(metricsPerEventNameThresholdsMap.get("policyScope"));
662 metricsPerEventName.setControlLoopSchemaType(ControlLoopSchemaType.valueOf(
663 metricsPerEventNameThresholdsMap.get("closedLoopControlName")));
664 return metricsPerEventName;
668 * Converts a flattened key/value map which has keys delimited by a given delimiter.
669 * The start Index and end index extract the sub-key value and returns a new map containing
670 * sub-keys and values.
672 * @param actualMap actual Map
673 * @param startIndex start index
674 * @param endIndex end index
675 * @param delimiter delimiter
677 * @return Map with new sub tree map
679 public static Map<String, Map<String, String>> extractSubTree(
680 final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {
682 final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();
684 // iterate over actual map entries
685 for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {
686 final String actualMapKey = actualMapEntry.getKey();
687 final String actualMapValue = actualMapEntry.getValue();
689 // determine delimiter start and end index
690 final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);
691 final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);
692 final int keyLength = actualMapKey.length();
694 // extract sub-tree map
695 if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {
696 final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);
697 final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);
698 final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);
699 if (existingThresholdMap == null) {
700 Map<String, String> newThresholdMap = new LinkedHashMap<>();
701 newThresholdMap.put(subMapKey, actualMapValue);
702 subTreeMap.put(thresholdKey, newThresholdMap);
704 existingThresholdMap.put(subMapKey, actualMapValue);
716 * Provides a view of underlying map that filters out entries with keys starting with give prefix
718 * @param actualMap Target map that needs to be filtered
719 * @param keyNamePrefix key prefix
721 * @return a view of actual map which only show entries which have give prefix
723 public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,
724 final String keyNamePrefix) {
725 return Maps.filterKeys(actualMap,
726 new Predicate<String>() {
728 public boolean apply(@Nullable String key) {
729 return key != null && key.startsWith(keyNamePrefix);
736 * Creates Quartz Scheduler
738 * @param pollingIntervalMS polling interval
739 * @param stdSchedulerFactory Quartz standard schedule factory instance
740 * @param quartzPublisherPropertiesFileName quartz properties file name
741 * @param jobDataMap job Data map
742 * @param quartzJobClass Quartz Job Class
743 * @param quartzJobName Quartz Job Name
744 * @param quartzTriggerName Quartz Trigger name
746 * @param <T> An implementation of Quartz {@link Job} interface
747 * @return Configured Quartz Scheduler
749 * @throws SchedulerException exception if unable to create to Quartz Scheduler
751 public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,
752 final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,
753 final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,
754 final String quartzTriggerName) throws SchedulerException {
756 // Initialize a new Quartz Standard scheduler
757 LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",
758 quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);
759 final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(
760 quartzPublisherPropertiesFileName, new Properties());
761 stdSchedulerFactory.initialize(quartzProperties);
762 final Scheduler scheduler = stdSchedulerFactory.getScheduler();
764 // Create a new job detail
765 final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,
766 AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();
768 // Create a new scheduling builder
769 final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
770 .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule
771 .repeatForever(); // repeats while worker is running
773 // Create a trigger for the TCA Publisher Job
774 final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()
775 .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)
776 .startNow() // job starts right away
777 .withSchedule(simpleScheduleBuilder).build();
779 scheduler.scheduleJob(jobDetail, simpleTrigger);
780 LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());