2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.tca.utils;
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.google.common.base.Function;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Predicate;
27 import com.google.common.base.Predicates;
28 import com.google.common.base.Supplier;
29 import com.google.common.base.Suppliers;
30 import com.google.common.collect.HashBasedTable;
31 import com.google.common.collect.ImmutableList;
32 import com.google.common.collect.Iterables;
33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Maps;
35 import com.google.common.collect.Table;
36 import com.jayway.jsonpath.DocumentContext;
37 import com.jayway.jsonpath.JsonPath;
38 import com.jayway.jsonpath.TypeRef;
39 import org.apache.commons.lang3.StringUtils;
40 import org.apache.commons.lang3.tuple.ImmutablePair;
41 import org.apache.commons.lang3.tuple.Pair;
42 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
43 import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
44 import org.openecomp.dcae.apod.analytics.common.service.processor.AbstractMessageProcessor;
45 import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor;
46 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertAction;
47 import org.openecomp.dcae.apod.analytics.model.domain.cef.AlertType;
48 import org.openecomp.dcae.apod.analytics.model.domain.cef.CommonEventHeader;
49 import org.openecomp.dcae.apod.analytics.model.domain.cef.Criticality;
50 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
51 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventSeverity;
52 import org.openecomp.dcae.apod.analytics.model.domain.cef.PerformanceCounter;
53 import org.openecomp.dcae.apod.analytics.model.domain.cef.ThresholdCrossingAlertFields;
54 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Direction;
55 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerFunctionalRole;
56 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
57 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
58 import org.openecomp.dcae.apod.analytics.model.facade.tca.AAI;
59 import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
60 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelIOUtils;
61 import org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
62 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFJsonProcessor;
63 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyDomainFilter;
64 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyFunctionalRoleFilter;
65 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFPolicyThresholdsProcessor;
66 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
67 import org.quartz.Job;
68 import org.quartz.JobBuilder;
69 import org.quartz.JobDataMap;
70 import org.quartz.JobDetail;
71 import org.quartz.Scheduler;
72 import org.quartz.SchedulerException;
73 import org.quartz.SimpleScheduleBuilder;
74 import org.quartz.SimpleTrigger;
75 import org.quartz.TriggerBuilder;
76 import org.quartz.impl.StdSchedulerFactory;
77 import org.slf4j.Logger;
78 import org.slf4j.LoggerFactory;
80 import java.util.ArrayList;
81 import java.util.Collections;
82 import java.util.Comparator;
83 import java.util.Date;
84 import java.util.HashMap;
85 import java.util.LinkedHashMap;
86 import java.util.LinkedList;
87 import java.util.List;
89 import java.util.Properties;
91 import java.util.SortedMap;
92 import java.util.TreeMap;
93 import java.util.UUID;
95 import javax.annotation.Nonnull;
96 import javax.annotation.Nullable;
98 import static com.google.common.collect.Lists.newArrayList;
99 import static org.apache.commons.lang3.time.DateFormatUtils.SMTP_DATETIME_FORMAT;
102 * Utility Helper methods for TCA sub module only. Extends {@link AnalyticsModelJsonUtils} to get
103 * pre configured Json Object Mapper understand serialization and deserialization of CEF Message
106 * @author Rajiv Singla . Creation Date: 10/24/2016.
108 public abstract class TCAUtils extends AnalyticsModelJsonUtils {
110 private static final Logger LOG = LoggerFactory.getLogger(TCAUtils.class);
113 * Threshold Comparator which is used to order thresholds based on their severity e.g. ( CRITICAL, MAJOR, MINOR,
116 private static final Comparator<Threshold> THRESHOLD_COMPARATOR = new Comparator<Threshold>() {
118 public int compare(Threshold threshold1, Threshold threshold2) {
119 return threshold1.getSeverity().compareTo(threshold2.getSeverity());
124 * {@link Function} that extracts {@link TCAPolicy#getMetricsPerFunctionalRole()} from {@link TCAPolicy}
126 * @return TCA Policy Metrics Per Functional Roles List
128 public static Function<TCAPolicy, List<MetricsPerFunctionalRole>> tcaPolicyMetricsExtractorFunction() {
129 return new Function<TCAPolicy, List<MetricsPerFunctionalRole>>() {
132 public List<MetricsPerFunctionalRole> apply(@Nonnull TCAPolicy tcaPolicy) {
133 return tcaPolicy.getMetricsPerFunctionalRole();
139 * {@link Function} that extracts {@link MetricsPerFunctionalRole#getFunctionalRole()} from
140 * {@link MetricsPerFunctionalRole}
142 * @return Functional role or a Metrics Per Functional Role object
144 public static Function<MetricsPerFunctionalRole, String> tcaFunctionalRoleExtractorFunction() {
145 return new Function<MetricsPerFunctionalRole, String>() {
147 public String apply(@Nonnull MetricsPerFunctionalRole metricsPerFunctionalRole) {
148 return metricsPerFunctionalRole.getFunctionalRole();
155 * Extracts {@link TCAPolicy} Functional Roles
157 * @param tcaPolicy TCA Policy
158 * @return List of functional Roles in the tca Policy
160 public static List<String> getPolicyFunctionalRoles(@Nonnull final TCAPolicy tcaPolicy) {
161 final List<MetricsPerFunctionalRole> metricsPerFunctionalRoles =
162 tcaPolicyMetricsExtractorFunction().apply(tcaPolicy);
164 return Lists.transform(metricsPerFunctionalRoles, tcaFunctionalRoleExtractorFunction());
168 * A {@link Supplier} which caches {@link TCAPolicy} Functional Roles as they are not expected to
169 * change during runtime
171 * @param tcaPolicy TCA Policy
172 * @return a Supplier that memoize the Functional roles
174 public static Supplier<List<String>> getPolicyFunctionalRoleSupplier(@Nonnull final TCAPolicy tcaPolicy) {
175 return Suppliers.memoize(new Supplier<List<String>>() {
177 public List<String> get() {
178 return getPolicyFunctionalRoles(tcaPolicy);
185 * Creates a Table to lookup thresholds of a {@link TCAPolicy} by its Functional Role and Threshold Field path
187 * @param tcaPolicy TCA Policy
188 * @return A table with Keys of functional role and field path containing List of threshold as values
190 public static Table<String, String, List<Threshold>> getPolicyFRThresholdsTable(final TCAPolicy tcaPolicy) {
191 final Table<String, String, List<Threshold>> domainFRTable = HashBasedTable.create();
192 for (MetricsPerFunctionalRole metricsPerFunctionalRole : tcaPolicy.getMetricsPerFunctionalRole()) {
193 final String functionalRole = metricsPerFunctionalRole.getFunctionalRole();
194 final List<Threshold> thresholds = metricsPerFunctionalRole.getThresholds();
195 for (Threshold threshold : thresholds) {
196 final List<Threshold> existingThresholds = domainFRTable.get(functionalRole, threshold.getFieldPath());
197 if (existingThresholds == null) {
198 final LinkedList<Threshold> newThresholdList = new LinkedList<>();
199 newThresholdList.add(threshold);
200 domainFRTable.put(functionalRole, threshold.getFieldPath(), newThresholdList);
202 domainFRTable.get(functionalRole, threshold.getFieldPath()).add(threshold);
206 return domainFRTable;
211 * A {@link Supplier} which caches Policy Functional Role and Threshold Field Path Thresholds lookup table
213 * @param tcaPolicy TCA Policy
214 * @return Cached Supplier for table with Keys of functional role and field path containing thresholds as values
216 public static Supplier<Table<String, String, List<Threshold>>> getPolicyFRThresholdsTableSupplier
217 (final TCAPolicy tcaPolicy) {
218 return Suppliers.memoize(new Supplier<Table<String, String, List<Threshold>>>() {
220 public Table<String, String, List<Threshold>> get() {
221 return getPolicyFRThresholdsTable(tcaPolicy);
228 * Creates a {@link GenericMessageChainProcessor} of {@link TCACEFJsonProcessor},
229 * {@link TCACEFPolicyDomainFilter} and {@link TCACEFPolicyFunctionalRoleFilter}s to
230 * filter out messages which does not match policy domain or functional role
232 * @param cefMessage CEF Message
233 * @param tcaPolicy TCA Policy
234 * @return Message Process Context after processing filter chain
236 public static TCACEFProcessorContext filterCEFMessage(@Nullable final String cefMessage,
237 @Nonnull final TCAPolicy tcaPolicy) {
239 final TCACEFJsonProcessor jsonProcessor = new TCACEFJsonProcessor();
240 final TCACEFPolicyDomainFilter domainFilter = new TCACEFPolicyDomainFilter();
241 final TCACEFPolicyFunctionalRoleFilter functionalRoleFilter = new TCACEFPolicyFunctionalRoleFilter();
242 // Create a list of message processors
243 final ImmutableList<AbstractMessageProcessor<TCACEFProcessorContext>> messageProcessors =
244 ImmutableList.of(jsonProcessor, domainFilter, functionalRoleFilter);
245 final TCACEFProcessorContext processorContext = new TCACEFProcessorContext(cefMessage, tcaPolicy);
246 // Create a message processors chain
247 final GenericMessageChainProcessor<TCACEFProcessorContext> tcaProcessingChain =
248 new GenericMessageChainProcessor<>(messageProcessors, processorContext);
250 return tcaProcessingChain.processChain();
255 * Extracts json path values for given json Field Paths from using Json path notation. Assumes
256 * that values extracted are always long
258 * @param message CEF Message
259 * @param jsonFieldPaths Json Field Paths
260 * @return Map containing key as json path and values as values associated with that json path
262 public static Map<String, List<Long>> getJsonPathValue(@Nonnull String message, @Nonnull Set<String>
265 final Map<String, List<Long>> jsonFieldPathMap = new HashMap<>();
266 final DocumentContext documentContext = JsonPath.parse(message);
268 for (String jsonFieldPath : jsonFieldPaths) {
269 final List<Long> jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<Long>>() {
271 // If Json Field Values are not or empty
272 if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
273 // Filter out all null values in the filed values list
274 final List<Long> nonNullValues = Lists.newLinkedList(Iterables.filter(jsonFieldValues,
275 Predicates.<Long>notNull()));
276 // If there are non null values put them in the map
277 if (!nonNullValues.isEmpty()) {
278 jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
283 return jsonFieldPathMap;
287 * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
288 * it applies threshold in order of their severity and record the first threshold per message field path
290 * @param messageFieldValues Field Path Values extracted from CEF Message
291 * @param fieldThresholds Policy Thresholds for Field Path
292 * @return Optional of violated threshold for a field path
294 public static Optional<Threshold> thresholdCalculator(final List<Long> messageFieldValues, final List<Threshold>
296 // order thresholds by severity
297 Collections.sort(fieldThresholds, THRESHOLD_COMPARATOR);
298 // Now apply each threshold to field values
299 for (Threshold fieldThreshold : fieldThresholds) {
300 for (Long messageFieldValue : messageFieldValues) {
301 final Boolean isThresholdViolated =
302 fieldThreshold.getDirection().operate(messageFieldValue, fieldThreshold.getThresholdValue());
303 if (isThresholdViolated) {
304 final Threshold violatedThreshold = Threshold.copy(fieldThreshold);
305 violatedThreshold.setActualFieldValue(messageFieldValue);
306 return Optional.of(violatedThreshold);
310 return Optional.absent();
314 * Prioritize Threshold to be reported in case there was multiple TCA violations in a single CEF message.
315 * Grabs first highest priority violated threshold
317 * @param violatedThresholdsMap Map containing field Path and associated violated Thresholds
318 * @return First Highest priority violated threshold
320 public static Threshold prioritizeThresholdViolations(final Map<String, Threshold> violatedThresholdsMap) {
322 final List<Threshold> violatedThresholds = newArrayList(violatedThresholdsMap.values());
324 if (violatedThresholds.size() == 1) {
325 return violatedThresholds.get(0);
327 Collections.sort(violatedThresholds, THRESHOLD_COMPARATOR);
328 // Just grab the first violated threshold with highest priority
329 return violatedThresholds.get(0);
334 * Creates {@link MetricsPerFunctionalRole} object which contains violated thresholds
336 * @param tcaPolicy TCA Policy
337 * @param violatedThreshold Violated thresholds
338 * @param functionalRole Functional Role
340 * @return MetricsPerFunctionalRole object containing one highest severity violated threshold
342 public static MetricsPerFunctionalRole createViolatedMetrics(@Nonnull final TCAPolicy tcaPolicy,
343 @Nonnull final Threshold violatedThreshold,
344 @Nonnull final String functionalRole) {
346 final ArrayList<MetricsPerFunctionalRole> metricsPerFunctionalRoles = newArrayList(
347 Iterables.filter(tcaPolicy.getMetricsPerFunctionalRole(), new Predicate<MetricsPerFunctionalRole>() {
349 public boolean apply(@Nonnull MetricsPerFunctionalRole metricsPerFunctionalRole) {
350 return metricsPerFunctionalRole.getFunctionalRole().equals(functionalRole);
353 // TCA policy must have only one metrics role per functional role
354 if (metricsPerFunctionalRoles.size() == 1) {
355 final MetricsPerFunctionalRole violatedMetrics =
356 MetricsPerFunctionalRole.copy(metricsPerFunctionalRoles.get(0));
357 violatedMetrics.setThresholds(ImmutableList.of(Threshold.copy(violatedThreshold)));
358 return violatedMetrics;
360 final String errorMessage = String.format("TCA Policy must contain functional Role: %s", functionalRole);
361 throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
366 * Computes threshold violations
368 * @param processorContext Filtered processor Context
369 * @return processor context with any threshold violations
371 public static TCACEFProcessorContext computeThresholdViolations(final TCACEFProcessorContext processorContext) {
372 final TCACEFPolicyThresholdsProcessor policyThresholdsProcessor = new TCACEFPolicyThresholdsProcessor();
373 return policyThresholdsProcessor.apply(processorContext);
378 * Creates TCA Alert String - Alert String is created in both {@link EventListener} or {@link TCAVESResponse}
381 * @param processorContextWithViolations processor context which has TCA violations
382 * @param tcaAppName tca app name
383 * @param isAlertInCEFFormat determines if output alert is in CEF format
385 * @return TCA Alert String
387 * @throws JsonProcessingException If alert cannot be parsed into JSON String
389 public static String createTCAAlertString(final TCACEFProcessorContext processorContextWithViolations,
390 final String tcaAppName,
391 final Boolean isAlertInCEFFormat) throws JsonProcessingException {
392 if (isAlertInCEFFormat != null && isAlertInCEFFormat) {
393 final EventListener eventListenerWithViolations =
394 addThresholdViolationFields(processorContextWithViolations);
395 final String alertString = writeValueAsString(eventListenerWithViolations);
396 LOG.debug("Created alert in CEF Format: {}", alertString);
399 final TCAVESResponse newTCAVESResponse =
400 createNewTCAVESResponse(processorContextWithViolations, tcaAppName);
401 final String alertString = writeValueAsString(newTCAVESResponse);
402 LOG.debug("Created alert in Non CEF Format: {}", alertString);
408 * Adds threshold violation fields to {@link EventListener}
410 * @param processorContextWithViolations processor context that contains violations
411 * @return event listener with threshold crossing alert fields populated
413 public static EventListener addThresholdViolationFields(
414 final TCACEFProcessorContext processorContextWithViolations) {
416 final MetricsPerFunctionalRole metricsPerFunctionalRole =
417 processorContextWithViolations.getMetricsPerFunctionalRole();
418 // confirm violations are indeed present
419 if (metricsPerFunctionalRole == null) {
420 final String errorMessage = "No violations metrics. Unable to add Threshold Violation Fields";
421 throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
424 // get violated threshold
425 final Threshold violatedThreshold = metricsPerFunctionalRole.getThresholds().get(0);
426 final EventListener eventListener = processorContextWithViolations.getCEFEventListener();
427 final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
429 // create new threshold crossing alert fields
430 final ThresholdCrossingAlertFields thresholdCrossingAlertFields = new ThresholdCrossingAlertFields();
431 thresholdCrossingAlertFields.setEventStartTimestamp(commonEventHeader.getStartEpochMicrosec().toString());
432 thresholdCrossingAlertFields.setEventSeverity(violatedThreshold.getSeverity());
433 thresholdCrossingAlertFields.setCollectionTimestamp(SMTP_DATETIME_FORMAT.format(new Date()));
434 thresholdCrossingAlertFields.setAlertAction(AlertAction.SET);
435 thresholdCrossingAlertFields.setAlertType(AlertType.INTERFACE_ANOMALY);
436 thresholdCrossingAlertFields.setAlertDescription(violatedThreshold.getDirection().toString());
437 thresholdCrossingAlertFields.setInterfaceName(commonEventHeader.getReportingEntityName());
438 thresholdCrossingAlertFields.setElementType(commonEventHeader.getFunctionalRole());
440 // create new performance count
441 final PerformanceCounter performanceCounter = new PerformanceCounter();
442 performanceCounter.setCriticality(convertSeverityToCriticality(violatedThreshold.getSeverity()));
443 performanceCounter.setName(violatedThreshold.getFieldPath());
444 performanceCounter.setValue(violatedThreshold.getActualFieldValue().toString());
445 performanceCounter.setThresholdCrossed(violatedThreshold.getThresholdValue().toString());
447 // set additional parameters for threshold crossing alert fields
448 thresholdCrossingAlertFields.setAdditionalParameters(ImmutableList.of(performanceCounter));
450 // add threshold crossing fields to existing event listener
451 eventListener.getEvent().setThresholdCrossingAlertFields(thresholdCrossingAlertFields);
453 return eventListener;
457 * Converts {@link EventSeverity} to {@link Criticality}
459 * @param eventSeverity event severity
461 * @return performance counter criticality
463 private static Criticality convertSeverityToCriticality(final EventSeverity eventSeverity) {
464 switch (eventSeverity) {
466 return Criticality.CRIT;
468 return Criticality.MAJ;
470 return Criticality.UNKNOWN;
475 * Creates {@link TCAVESResponse} object
477 * @param processorContext processor Context with violations
478 * @param tcaAppName TCA App Name
480 * @return TCA VES Response Message
482 public static TCAVESResponse createNewTCAVESResponse(final TCACEFProcessorContext processorContext,
483 final String tcaAppName) {
485 final MetricsPerFunctionalRole metricsPerFunctionalRole = processorContext.getMetricsPerFunctionalRole();
486 // confirm violations are indeed present
487 if (metricsPerFunctionalRole == null) {
488 final String errorMessage = "No violations metrics. Unable to create VES Response";
489 throw new MessageProcessingException(errorMessage, LOG, new IllegalArgumentException(errorMessage));
492 final String functionalRole = metricsPerFunctionalRole.getFunctionalRole();
493 final Threshold violatedThreshold = metricsPerFunctionalRole.getThresholds().get(0);
494 final EventListener eventListener = processorContext.getCEFEventListener();
495 final CommonEventHeader commonEventHeader = eventListener.getEvent().getCommonEventHeader();
497 final TCAVESResponse tcavesResponse = new TCAVESResponse();
498 // ClosedLoopControlName included in the DCAE configuration Policy
499 tcavesResponse.setClosedLoopControlName(violatedThreshold.getClosedLoopControlName());
500 // version included in the DCAE configuration Policy
501 tcavesResponse.setVersion(violatedThreshold.getVersion());
502 // Generate a UUID for this output message
503 tcavesResponse.setRequestID(UUID.randomUUID().toString());
504 // commonEventHeader.startEpochMicrosec from the received VES measurementsForVfScaling message
505 tcavesResponse.setClosedLoopAlarmStart(commonEventHeader.getStartEpochMicrosec());
506 // Concatenate name of this DCAE instance and name for this TCA instance, separated by dot
507 // TODO: Find out how to get this field
508 tcavesResponse.setClosedLoopEventClient("DCAE_INSTANCE_ID." + tcaAppName);
510 final AAI aai = new AAI();
511 tcavesResponse.setAai(aai);
513 // vLoadBalancer specific settings
514 if (isFunctionalRoleVLoadBalancer(functionalRole)) {
516 tcavesResponse.setTargetType(AnalyticsConstants.LOAD_BALANCER_TCA_VES_RESPONSE_TARGET_TYPE);
517 // Hard Coded - "vserver.vserver-name"
518 tcavesResponse.setTarget(AnalyticsConstants.LOAD_BALANCER_TCA_VES_RESPONSE_TARGET);
519 aai.setGenericServerId(commonEventHeader.getReportingEntityName());
521 // Hard Coded - "VNF"
522 tcavesResponse.setTargetType(AnalyticsConstants.TCA_VES_RESPONSE_TARGET_TYPE);
523 // Hard Coded - "generic-vnf.vnf-id"
524 tcavesResponse.setTarget(AnalyticsConstants.TCA_VES_RESPONSE_TARGET);
525 // commonEventHeader.reportingEntityName from the received VES measurementsForVfScaling message (value for
526 // the data element used in A&AI)
527 aai.setGenericVNFId(commonEventHeader.getReportingEntityName());
530 // Hard Coded - "DCAE"
531 tcavesResponse.setFrom(AnalyticsConstants.TCA_VES_RESPONSE_FROM);
532 // policyScope included in the DCAE configuration Policy
533 tcavesResponse.setPolicyScope(metricsPerFunctionalRole.getPolicyScope());
534 // policyName included in the DCAE configuration Policy
535 tcavesResponse.setPolicyName(metricsPerFunctionalRole.getPolicyName());
536 // policyVersion included in the DCAE configuration Policy
537 tcavesResponse.setPolicyVersion(metricsPerFunctionalRole.getPolicyVersion());
538 // Hard Coded - "ONSET"
539 tcavesResponse.setClosedLoopEventStatus(AnalyticsConstants.TCA_VES_RESPONSE_CLOSED_LOOP_EVENT_STATUS);
541 return tcavesResponse;
545 * Determines if Functional Role is vLoadBalancer
547 * @param functionalRole functional Role to check
549 * @return return true if functional role is for vLoadBalancer
551 private static boolean isFunctionalRoleVLoadBalancer(final String functionalRole) {
552 return functionalRole.equals(AnalyticsConstants.LOAD_BALANCER_FUNCTIONAL_ROLE);
557 * Extract Domain and functional Role from processor context if present
559 * @param processorContext processor context
560 * @return Tuple of domain and functional role
562 public static Pair<String, String> getDomainAndFunctionalRole(@Nullable final TCACEFProcessorContext
565 String domain = null;
566 String functionalRole = null;
568 if (processorContext != null &&
569 processorContext.getCEFEventListener() != null &&
570 processorContext.getCEFEventListener().getEvent() != null &&
571 processorContext.getCEFEventListener().getEvent().getCommonEventHeader() != null) {
572 final CommonEventHeader commonEventHeader = processorContext.getCEFEventListener().getEvent()
573 .getCommonEventHeader();
575 if (commonEventHeader.getDomain() != null) {
576 domain = commonEventHeader.getDomain();
579 if (commonEventHeader.getFunctionalRole() != null) {
580 functionalRole = commonEventHeader.getFunctionalRole();
585 return new ImmutablePair<>(domain, functionalRole);
590 * Creates {@link TCAPolicy} Metrics per Functional Role list
592 * @param functionalRolesMap Map containing functional Roles as key and corresponding values
594 * @return List of {@link MetricsPerFunctionalRole}
596 public static List<MetricsPerFunctionalRole> createTCAPolicyMetricsPerFunctionalRoleList(
597 final Map<String, Map<String, String>> functionalRolesMap) {
599 // create a new metrics per functional role list
600 final List<MetricsPerFunctionalRole> metricsPerFunctionalRoles = new LinkedList<>();
602 for (Map.Entry<String, Map<String, String>> functionalRolesEntry : functionalRolesMap.entrySet()) {
604 // create new metrics per functional role instance
605 final MetricsPerFunctionalRole newMetricsPerFunctionalRole =
606 createNewMetricsPerFunctionalRole(functionalRolesEntry);
607 metricsPerFunctionalRoles.add(newMetricsPerFunctionalRole);
609 // determine all threshold related values
610 final Map<String, String> thresholdsValuesMaps =
611 filterMapByKeyNamePrefix(functionalRolesEntry.getValue(),
612 AnalyticsConstants.TCA_POLICY_THRESHOLDS_PATH_POSTFIX);
614 // create a map of all threshold values
615 final Map<String, Map<String, String>> thresholdsMap =
616 extractSubTree(thresholdsValuesMaps, 1, 2,
617 AnalyticsConstants.TCA_POLICY_DELIMITER);
619 // add thresholds to nmetrics per functional roles threshold list
620 for (Map<String, String> thresholdMap : thresholdsMap.values()) {
621 newMetricsPerFunctionalRole.getThresholds().add(createNewThreshold(thresholdMap));
626 return metricsPerFunctionalRoles;
630 * Creates new instance of TCA Policy {@link Threshold} with values extracted from thresholdMap
632 * @param thresholdMap threshold map with threshold values
634 * @return new instance of TCA Policy Threshold
636 public static Threshold createNewThreshold(final Map<String, String> thresholdMap) {
637 final Threshold threshold = new Threshold();
638 threshold.setClosedLoopControlName(thresholdMap.get("policy.closedLoopControlName"));
639 threshold.setVersion(thresholdMap.get("policy.version"));
640 threshold.setFieldPath(thresholdMap.get("policy.fieldPath"));
641 threshold.setDirection(Direction.valueOf(thresholdMap.get("policy.direction")));
642 threshold.setSeverity(EventSeverity.valueOf(thresholdMap.get("policy.severity")));
643 threshold.setThresholdValue(Long.valueOf(thresholdMap.get("policy.thresholdValue")));
648 * Create new {@link MetricsPerFunctionalRole} instance with policy Name, policy Version and policy Scope
649 * extracted from given functionalRolesEntry
651 * @param functionalRolesEntry Functional Role Entry
653 * @return new instance of MetricsPerFunctionalRole
655 public static MetricsPerFunctionalRole createNewMetricsPerFunctionalRole(
656 final Map.Entry<String, Map<String, String>> functionalRolesEntry) {
657 // determine functional Role
658 final String functionalRole = functionalRolesEntry.getKey();
659 // determine functional Role thresholds
660 final Map<String, String> metricsPerFunctionalRoleThresholdsMap = functionalRolesEntry.getValue();
661 final MetricsPerFunctionalRole metricsPerFunctionalRole = new MetricsPerFunctionalRole();
662 final List<Threshold> thresholds = new LinkedList<>();
663 metricsPerFunctionalRole.setThresholds(thresholds);
664 metricsPerFunctionalRole.setFunctionalRole(functionalRole);
665 // bind policyName, policyVersion and policyScope
666 metricsPerFunctionalRole.setPolicyName(metricsPerFunctionalRoleThresholdsMap.get("policyName"));
667 metricsPerFunctionalRole.setPolicyVersion(metricsPerFunctionalRoleThresholdsMap.get("policyVersion"));
668 metricsPerFunctionalRole.setPolicyScope(metricsPerFunctionalRoleThresholdsMap.get("policyScope"));
669 return metricsPerFunctionalRole;
673 * Converts a flattened key/value map which has keys delimited by a given delimiter.
674 * The start Index and end index extract the sub-key value and returns a new map containing
675 * sub-keys and values.
677 * @param actualMap actual Map
678 * @param startIndex start index
679 * @param endIndex end index
680 * @param delimiter delimiter
682 * @return Map with new sub tree map
684 public static Map<String, Map<String, String>> extractSubTree(
685 final Map<String, String> actualMap, int startIndex, int endIndex, String delimiter) {
687 final SortedMap<String, Map<String, String>> subTreeMap = new TreeMap<>();
689 // iterate over actual map entries
690 for (Map.Entry<String, String> actualMapEntry : actualMap.entrySet()) {
691 final String actualMapKey = actualMapEntry.getKey();
692 final String actualMapValue = actualMapEntry.getValue();
694 // determine delimiter start and end index
695 final int keyStartIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, startIndex);
696 final int keyEndIndex = StringUtils.ordinalIndexOf(actualMapKey, delimiter, endIndex);
697 final int keyLength = actualMapKey.length();
699 // extract sub-tree map
700 if (keyStartIndex != -1 && keyEndIndex != -1 && keyEndIndex > keyStartIndex && keyLength > keyEndIndex) {
701 final String thresholdKey = actualMapKey.substring(keyStartIndex + 1, keyEndIndex);
702 final Map<String, String> existingThresholdMap = subTreeMap.get(thresholdKey);
703 final String subMapKey = actualMapKey.substring(keyEndIndex + 1, keyLength);
704 if (existingThresholdMap == null) {
705 Map<String, String> newThresholdMap = new LinkedHashMap<>();
706 newThresholdMap.put(subMapKey, actualMapValue);
707 subTreeMap.put(thresholdKey, newThresholdMap);
709 existingThresholdMap.put(subMapKey, actualMapValue);
721 * Provides a view of underlying map that filters out entries with keys starting with give prefix
723 * @param actualMap Target map that needs to be filtered
724 * @param keyNamePrefix key prefix
726 * @return a view of actual map which only show entries which have give prefix
728 public static Map<String, String> filterMapByKeyNamePrefix(final Map<String, String> actualMap,
729 final String keyNamePrefix) {
730 return Maps.filterKeys(actualMap,
731 new Predicate<String>() {
733 public boolean apply(@Nullable String key) {
734 return key != null && key.startsWith(keyNamePrefix);
741 * Creates Quartz Scheduler
743 * @param pollingIntervalMS polling interval
744 * @param stdSchedulerFactory Quartz standard schedule factory instance
745 * @param quartzPublisherPropertiesFileName quartz properties file name
746 * @param jobDataMap job Data map
747 * @param quartzJobClass Quartz Job Class
748 * @param quartzJobName Quartz Job Name
749 * @param quartzTriggerName Quartz Trigger name
751 * @param <T> An implementation of Quartz {@link Job} interface
752 * @return Configured Quartz Scheduler
754 * @throws SchedulerException expection if unable to create to Quartz Scheduler
756 public static <T extends Job> Scheduler createQuartzScheduler(final Integer pollingIntervalMS,
757 final StdSchedulerFactory stdSchedulerFactory, final String quartzPublisherPropertiesFileName,
758 final JobDataMap jobDataMap, final Class<T> quartzJobClass, final String quartzJobName,
759 final String quartzTriggerName) throws SchedulerException {
761 // Initialize a new Quartz Standard scheduler
762 LOG.debug("Configuring quartz scheduler for Quartz Job: {} with properties file: {}",
763 quartzJobClass.getSimpleName(), quartzPublisherPropertiesFileName);
764 final Properties quartzProperties = AnalyticsModelIOUtils.loadPropertiesFile(
765 quartzPublisherPropertiesFileName, new Properties());
766 stdSchedulerFactory.initialize(quartzProperties);
767 final Scheduler scheduler = stdSchedulerFactory.getScheduler();
769 // Create a new job detail
770 final JobDetail jobDetail = JobBuilder.newJob(quartzJobClass).withIdentity(quartzJobName,
771 AnalyticsConstants.TCA_QUARTZ_GROUP_NAME).usingJobData(jobDataMap).build();
773 // Create a new scheduling builder
774 final SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule()
775 .withIntervalInMilliseconds(pollingIntervalMS) // job will use custom polling schedule
776 .repeatForever(); // repeats while worker is running
778 // Create a trigger for the TCA Publisher Job
779 final SimpleTrigger simpleTrigger = TriggerBuilder.newTrigger()
780 .withIdentity(quartzTriggerName, AnalyticsConstants.TCA_QUARTZ_GROUP_NAME)
781 .startNow() // job starts right away
782 .withSchedule(simpleScheduleBuilder).build();
784 scheduler.scheduleJob(jobDetail, simpleTrigger);
785 LOG.info("Scheduler Initialized successfully for JobName: {}", quartzJobClass.getSimpleName());