2 * ================================================================================
3 * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
20 package org.onap.dcae.analytics.tca.core.util.function.calculation;
22 import com.jayway.jsonpath.DocumentContext;
23 import com.jayway.jsonpath.JsonPath;
24 import com.jayway.jsonpath.TypeRef;
26 import java.math.BigDecimal;
27 import java.util.Collections;
28 import java.util.Comparator;
29 import java.util.HashMap;
30 import java.util.LinkedHashMap;
31 import java.util.LinkedList;
32 import java.util.List;
34 import java.util.Objects;
35 import java.util.Optional;
37 import java.util.stream.Collectors;
39 import javax.annotation.Nonnull;
41 import org.onap.dcae.analytics.model.cef.CommonEventHeader;
42 import org.onap.dcae.analytics.model.cef.Event;
43 import org.onap.dcae.analytics.model.cef.EventListener;
44 import org.onap.dcae.analytics.tca.core.exception.TcaProcessingException;
45 import org.onap.dcae.analytics.tca.core.service.TcaExecutionContext;
46 import org.onap.dcae.analytics.tca.core.service.TcaResultContext;
47 import org.onap.dcae.analytics.tca.model.policy.MetricsPerEventName;
48 import org.onap.dcae.analytics.tca.model.policy.TcaPolicy;
49 import org.onap.dcae.analytics.tca.model.policy.Threshold;
52 * @author Rajiv Singla
54 public class TcaThresholdViolationCalculator implements TcaCalculationFunction {
57 public TcaExecutionContext calculate(final TcaExecutionContext tcaExecutionContext) {
59 final String cefMessage = tcaExecutionContext.getCefMessage();
60 final EventListener eventListener = tcaExecutionContext.getTcaProcessingContext().getEventListener();
61 final TcaPolicy tcaPolicy = tcaExecutionContext.getTcaPolicy();
64 final String cefEventName = Optional.ofNullable(eventListener)
65 .map(EventListener::getEvent)
66 .map(Event::getCommonEventHeader)
67 .map(CommonEventHeader::getEventName)
68 .orElseThrow(() -> new TcaProcessingException("Required Field: EventName not present"));
70 // Get Policy's metrics per event name matching CEF message event name
71 final MetricsPerEventName policyMetricsPerEventName =
72 tcaPolicy.getMetricsPerEventName().stream()
73 .filter(m -> m.getEventName().equalsIgnoreCase(cefEventName))
74 .findFirst().orElseThrow(() ->
75 new TcaProcessingException("Required Field: MetricsPerEventName not present"));
78 // get violated policy threshold for cef event name sorted by severity
79 final Optional<Threshold> thresholdOptional =
80 getViolatedThreshold(policyMetricsPerEventName.getThresholds(), cefMessage);
83 // Check if threshold violation is present
84 if (!thresholdOptional.isPresent()) {
85 final String earlyTerminationMessage = "No Policy Threshold violation detected in CEF Message";
86 setTerminatingMessage(earlyTerminationMessage, tcaExecutionContext, false);
87 return tcaExecutionContext;
91 // Threshold violations are present - update tca processing result context
92 final MetricsPerEventName violatedMetricsPerEventName = copyMetricsPerEventName(policyMetricsPerEventName);
93 final Threshold violatedThreshold = thresholdOptional.get();
94 violatedMetricsPerEventName.setThresholds(Collections.singletonList(violatedThreshold));
95 final TcaResultContext tcaResultContext =
96 tcaExecutionContext.getTcaResultContext();
97 tcaResultContext.setViolatedMetricsPerEventName(violatedMetricsPerEventName);
99 return tcaExecutionContext;
104 * Provides violated threshold
106 * @param policyThresholds policy thresholds that need to applied to incoming cef message
107 * @param cefMessage incoming cef message
109 * @return list of all violated threshold
111 private static Optional<Threshold> getViolatedThreshold(final List<Threshold> policyThresholds,
112 final String cefMessage) {
114 // map containing key as field path and associated policy thresholds
115 final Map<String, List<Threshold>> policyFieldPathsMap = new LinkedHashMap<>();
116 for (final Threshold policyThreshold : policyThresholds) {
117 if (policyFieldPathsMap.get(policyThreshold.getFieldPath()) == null) {
118 final LinkedList<Threshold> policyThresholdList = new LinkedList<>();
119 policyThresholdList.add(policyThreshold);
120 policyFieldPathsMap.put(policyThreshold.getFieldPath(), policyThresholdList);
122 policyFieldPathsMap.get(policyThreshold.getFieldPath()).add(policyThreshold);
126 // get map containing key as field path and values for json path
127 final Map<String, List<BigDecimal>> messageFieldValuesMap =
128 getJsonPathValues(cefMessage, policyFieldPathsMap.keySet());
130 // if no matching path values - assuming no threshold violations
131 if (messageFieldValuesMap.isEmpty()) {
132 return Optional.empty();
135 // Determine all violated thresholds per message field Path
136 final Map<String, Threshold> violatedThresholdsMap = new HashMap<>();
137 for (Map.Entry<String, List<BigDecimal>> messageFieldValuesMapEntry : messageFieldValuesMap.entrySet()) {
138 final String messageFieldPath = messageFieldValuesMapEntry.getKey();
139 final List<Threshold> messageFieldAssociatedPolicyThresholds = policyFieldPathsMap.get(messageFieldPath);
140 if (messageFieldAssociatedPolicyThresholds != null) {
141 final Optional<Threshold> thresholdOptional = computeViolatedThreshold(
142 messageFieldValuesMapEntry.getValue(), messageFieldAssociatedPolicyThresholds);
143 thresholdOptional.ifPresent(threshold -> violatedThresholdsMap.put(messageFieldPath, threshold));
147 // if multiple fields have violated threshold - grab the first one with max severity
148 return violatedThresholdsMap.values().stream()
149 .sorted(Comparator.comparing(Threshold::getSeverity)).findFirst();
154 * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
155 * it applies threshold in order of their severity and then by direction and returns first violated threshold
157 * @param messageFieldValues Field Path Values extracted from CEF Message
158 * @param fieldThresholds Policy Thresholds for Field Path
160 * @return Optional of violated threshold for a field path
162 private static Optional<Threshold> computeViolatedThreshold(final List<BigDecimal> messageFieldValues,
163 final List<Threshold> fieldThresholds) {
165 // sort thresholds based on severity and then based on direction
166 final List<Threshold> sortedPolicyThresholds = fieldThresholds.stream()
167 .sorted((t1, t2) -> {
168 if (t1.getSeverity().compareTo(t2.getSeverity()) != 0) {
169 return t1.getSeverity().compareTo(t2.getSeverity());
171 return t1.getDirection().compareTo(t2.getDirection());
174 .collect(Collectors.toList());
176 // Now apply each threshold to field values
177 for (final Threshold policyThreshold : sortedPolicyThresholds) {
178 for (final BigDecimal messageFieldValue : messageFieldValues) {
179 final Boolean isThresholdViolated =
180 policyThreshold.getDirection().operate(messageFieldValue,
181 new BigDecimal(policyThreshold.getThresholdValue()));
182 if (isThresholdViolated) {
183 final Threshold violatedThreshold = copyThreshold(policyThreshold);
184 violatedThreshold.setActualFieldValue(messageFieldValue);
185 return Optional.of(violatedThreshold);
189 return Optional.empty();
193 * Extracts json path values for given json Field Path from using Json path notation.
195 * @param message CEF Message
196 * @param jsonFieldPaths Json Field Paths
198 * @return Map containing key as json path and values as values associated with that json path
200 private static Map<String, List<BigDecimal>> getJsonPathValues(@Nonnull String message,
201 @Nonnull Set<String> jsonFieldPaths) {
203 final Map<String, List<BigDecimal>> jsonFieldPathMap = new HashMap<>();
204 final DocumentContext documentContext = JsonPath.parse(message);
206 for (String jsonFieldPath : jsonFieldPaths) {
207 List<BigDecimal> jsonFieldValues;
210 jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<BigDecimal>>() {
212 } catch (Exception e) {
213 final String errorMessage = String.format(
214 "Unable to convert jsonFieldPath value to valid number." +
215 "Json Path: %s.Incoming message: %s", jsonFieldPath, message);
216 throw new TcaProcessingException(errorMessage, e);
218 // If Json Field Values are present
219 if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
220 // Filter out all null values in the filed values list
221 final List<BigDecimal> nonNullValues =
222 jsonFieldValues.stream().filter(Objects::nonNull).collect(Collectors.toList());
223 // If there are non null values put them in the map
224 if (!nonNullValues.isEmpty()) {
225 jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
230 return jsonFieldPathMap;
234 * Creates a new threshold hold object by copying the value of given threshold object
236 * @param threshold threshold that needs to be copied
238 * @return new threshold object which is copy of given threshold object
240 private static Threshold copyThreshold(final Threshold threshold) {
241 final Threshold copyThreshold = new Threshold();
242 copyThreshold.setClosedLoopControlName(threshold.getClosedLoopControlName());
243 copyThreshold.setClosedLoopEventStatus(threshold.getClosedLoopEventStatus());
244 copyThreshold.setVersion(threshold.getVersion());
245 copyThreshold.setFieldPath(threshold.getFieldPath());
246 copyThreshold.setThresholdValue(threshold.getThresholdValue());
247 copyThreshold.setDirection(threshold.getDirection());
248 copyThreshold.setSeverity(threshold.getSeverity());
249 return copyThreshold;
253 * Returns a copy of metric Per event name without copying thresholds
255 * @param metricsPerEventName metric per event name that needs to be copied
257 * @return new metric per event name object which is copy of given object
259 private static MetricsPerEventName copyMetricsPerEventName(final MetricsPerEventName metricsPerEventName) {
260 final MetricsPerEventName copyMetricsPerEventName = new MetricsPerEventName();
261 copyMetricsPerEventName.setEventName(metricsPerEventName.getEventName());
262 copyMetricsPerEventName.setControlLoopSchemaType(metricsPerEventName.getControlLoopSchemaType());
263 copyMetricsPerEventName.setPolicyScope(metricsPerEventName.getPolicyScope());
264 copyMetricsPerEventName.setPolicyName(metricsPerEventName.getPolicyName());
265 copyMetricsPerEventName.setPolicyVersion(metricsPerEventName.getPolicyVersion());
266 // no thresholds copied
267 return copyMetricsPerEventName;