2 * ============LICENSE_START=======================================================
3 * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
4 * Copyright (c) 2022 Wipro Limited Intellectual Property. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
21 package org.onap.dcae.analytics.tca.core.util.function.calculation;
23 import com.jayway.jsonpath.DocumentContext;
24 import com.jayway.jsonpath.JsonPath;
25 import com.jayway.jsonpath.TypeRef;
27 import java.math.BigDecimal;
28 import java.util.Collections;
29 import java.util.Comparator;
30 import java.util.HashMap;
31 import java.util.LinkedHashMap;
32 import java.util.LinkedList;
33 import java.util.List;
35 import java.util.Objects;
36 import java.util.Optional;
38 import java.util.stream.Collectors;
40 import javax.annotation.Nonnull;
42 import org.onap.dcae.analytics.model.cef.CommonEventHeader;
43 import org.onap.dcae.analytics.model.cef.Domain;
44 import org.onap.dcae.analytics.model.cef.Event;
45 import org.onap.dcae.analytics.model.cef.EventListener;
46 import org.onap.dcae.analytics.tca.core.exception.TcaProcessingException;
47 import org.onap.dcae.analytics.tca.core.service.TcaExecutionContext;
48 import org.onap.dcae.analytics.tca.core.service.TcaResultContext;
49 import org.onap.dcae.analytics.tca.model.policy.MetricsPerEventName;
50 import org.onap.dcae.analytics.tca.model.policy.TcaPolicy;
51 import org.onap.dcae.analytics.tca.model.policy.Threshold;
54 * @author Rajiv Singla
56 public class TcaThresholdViolationCalculator implements TcaCalculationFunction {
59 public TcaExecutionContext calculate(final TcaExecutionContext tcaExecutionContext) {
61 final String cefMessage = tcaExecutionContext.getCefMessage();
62 final EventListener eventListener = tcaExecutionContext.getTcaProcessingContext().getEventListener();
63 final List<TcaPolicy> tcaPolicy = tcaExecutionContext.getTcaPolicy();
64 TcaPolicy tcaPolicyFinal = null;
67 final String cefEventName = Optional.ofNullable(eventListener)
68 .map(EventListener::getEvent)
69 .map(Event::getCommonEventHeader)
70 .map(CommonEventHeader::getEventName)
71 .orElseThrow(() -> new TcaProcessingException("Required Field: EventName not present"));
73 final Optional<Domain> domainName = Optional.ofNullable(eventListener)
74 .map(EventListener::getEvent)
75 .map(Event::getCommonEventHeader)
76 .map(CommonEventHeader::getDomain);
78 final String cefMessageDomain = domainName.get().name();
80 for(TcaPolicy tca : tcaPolicy){
81 if (tca.getDomain().equalsIgnoreCase(cefMessageDomain)){
89 // Get Policy's metrics per event name matching CEF message event name
90 final MetricsPerEventName policyMetricsPerEventName =
91 tcaPolicyFinal.getMetricsPerEventName().stream()
92 .filter(m -> m.getEventName().equalsIgnoreCase(cefEventName))
93 .findFirst().orElseThrow(() ->
94 new TcaProcessingException("Required Field: MetricsPerEventName not present"));
96 // get violated policy threshold for cef event name sorted by severity
97 final Optional<Threshold> thresholdOptional =
98 getViolatedThreshold(policyMetricsPerEventName.getThresholds(), cefMessage);
101 // Check if threshold violation is present
102 if (!thresholdOptional.isPresent()) {
103 final String earlyTerminationMessage = "No Policy Threshold violation detected in CEF Message";
104 setTerminatingMessage(earlyTerminationMessage, tcaExecutionContext, false);
105 return tcaExecutionContext;
109 // Threshold violations are present - update tca processing result context
110 final MetricsPerEventName violatedMetricsPerEventName = copyMetricsPerEventName(policyMetricsPerEventName);
111 final Threshold violatedThreshold = thresholdOptional.get();
112 violatedMetricsPerEventName.setThresholds(Collections.singletonList(violatedThreshold));
113 final TcaResultContext tcaResultContext =
114 tcaExecutionContext.getTcaResultContext();
115 tcaResultContext.setViolatedMetricsPerEventName(violatedMetricsPerEventName);
117 return tcaExecutionContext;
122 * Provides violated threshold
124 * @param policyThresholds policy thresholds that need to applied to incoming cef message
125 * @param cefMessage incoming cef message
127 * @return list of all violated threshold
129 private static Optional<Threshold> getViolatedThreshold(final List<Threshold> policyThresholds,
130 final String cefMessage) {
132 // map containing key as field path and associated policy thresholds
133 final Map<String, List<Threshold>> policyFieldPathsMap = new LinkedHashMap<>();
134 for (final Threshold policyThreshold : policyThresholds) {
135 if (policyFieldPathsMap.get(policyThreshold.getFieldPath()) == null) {
136 final LinkedList<Threshold> policyThresholdList = new LinkedList<>();
137 policyThresholdList.add(policyThreshold);
138 policyFieldPathsMap.put(policyThreshold.getFieldPath(), policyThresholdList);
140 policyFieldPathsMap.get(policyThreshold.getFieldPath()).add(policyThreshold);
144 // get map containing key as field path and values for json path
145 final Map<String, List<BigDecimal>> messageFieldValuesMap =
146 getJsonPathValues(cefMessage, policyFieldPathsMap.keySet());
148 // if no matching path values - assuming no threshold violations
149 if (messageFieldValuesMap.isEmpty()) {
150 return Optional.empty();
153 // Determine all violated thresholds per message field Path
154 final Map<String, Threshold> violatedThresholdsMap = new HashMap<>();
155 for (Map.Entry<String, List<BigDecimal>> messageFieldValuesMapEntry : messageFieldValuesMap.entrySet()) {
156 final String messageFieldPath = messageFieldValuesMapEntry.getKey();
157 final List<Threshold> messageFieldAssociatedPolicyThresholds = policyFieldPathsMap.get(messageFieldPath);
158 if (messageFieldAssociatedPolicyThresholds != null) {
159 final Optional<Threshold> thresholdOptional = computeViolatedThreshold(
160 messageFieldValuesMapEntry.getValue(), messageFieldAssociatedPolicyThresholds);
161 thresholdOptional.ifPresent(threshold -> violatedThresholdsMap.put(messageFieldPath, threshold));
165 // if multiple fields have violated threshold - grab the first one with max severity
166 return violatedThresholdsMap.values().stream()
167 .sorted(Comparator.comparing(Threshold::getSeverity)).findFirst();
172 * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
173 * it applies threshold in order of their severity and then by direction and returns first violated threshold
175 * @param messageFieldValues Field Path Values extracted from CEF Message
176 * @param fieldThresholds Policy Thresholds for Field Path
178 * @return Optional of violated threshold for a field path
180 private static Optional<Threshold> computeViolatedThreshold(final List<BigDecimal> messageFieldValues,
181 final List<Threshold> fieldThresholds) {
183 // sort thresholds based on severity and then based on direction
184 final List<Threshold> sortedPolicyThresholds = fieldThresholds.stream()
185 .sorted((t1, t2) -> {
186 if (t1.getSeverity().compareTo(t2.getSeverity()) != 0) {
187 return t1.getSeverity().compareTo(t2.getSeverity());
189 return t1.getDirection().compareTo(t2.getDirection());
192 .collect(Collectors.toList());
194 // Now apply each threshold to field values
195 for (final Threshold policyThreshold : sortedPolicyThresholds) {
196 for (final BigDecimal messageFieldValue : messageFieldValues) {
197 final Boolean isThresholdViolated =
198 policyThreshold.getDirection().operate(messageFieldValue,
199 new BigDecimal(policyThreshold.getThresholdValue()));
200 if (isThresholdViolated) {
201 final Threshold violatedThreshold = copyThreshold(policyThreshold);
202 violatedThreshold.setActualFieldValue(messageFieldValue);
203 return Optional.of(violatedThreshold);
207 return Optional.empty();
211 * Extracts json path values for given json Field Path from using Json path notation.
213 * @param message CEF Message
214 * @param jsonFieldPaths Json Field Paths
216 * @return Map containing key as json path and values as values associated with that json path
218 private static Map<String, List<BigDecimal>> getJsonPathValues(@Nonnull String message,
219 @Nonnull Set<String> jsonFieldPaths) {
221 final Map<String, List<BigDecimal>> jsonFieldPathMap = new HashMap<>();
222 final DocumentContext documentContext = JsonPath.parse(message);
224 for (String jsonFieldPath : jsonFieldPaths) {
225 List<BigDecimal> jsonFieldValues;
228 jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<BigDecimal>>() {
230 } catch (Exception e) {
231 final String errorMessage = String.format(
232 "Unable to convert jsonFieldPath value to valid number." +
233 "Json Path: %s.Incoming message: %s", jsonFieldPath, message);
234 throw new TcaProcessingException(errorMessage, e);
236 // If Json Field Values are present
237 if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
238 // Filter out all null values in the filed values list
239 final List<BigDecimal> nonNullValues =
240 jsonFieldValues.stream().filter(Objects::nonNull).collect(Collectors.toList());
241 // If there are non null values put them in the map
242 if (!nonNullValues.isEmpty()) {
243 jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
247 return jsonFieldPathMap;
251 * Creates a new threshold hold object by copying the value of given threshold object
253 * @param threshold threshold that needs to be copied
255 * @return new threshold object which is copy of given threshold object
257 private static Threshold copyThreshold(final Threshold threshold) {
258 final Threshold copyThreshold = new Threshold();
259 copyThreshold.setClosedLoopControlName(threshold.getClosedLoopControlName());
260 copyThreshold.setClosedLoopEventStatus(threshold.getClosedLoopEventStatus());
261 copyThreshold.setVersion(threshold.getVersion());
262 copyThreshold.setFieldPath(threshold.getFieldPath());
263 copyThreshold.setThresholdValue(threshold.getThresholdValue());
264 copyThreshold.setDirection(threshold.getDirection());
265 copyThreshold.setSeverity(threshold.getSeverity());
266 return copyThreshold;
270 * Returns a copy of metric Per event name without copying thresholds
272 * @param metricsPerEventName metric per event name that needs to be copied
274 * @return new metric per event name object which is copy of given object
276 private static MetricsPerEventName copyMetricsPerEventName(final MetricsPerEventName metricsPerEventName) {
277 final MetricsPerEventName copyMetricsPerEventName = new MetricsPerEventName();
278 copyMetricsPerEventName.setEventName(metricsPerEventName.getEventName());
279 copyMetricsPerEventName.setControlLoopSchemaType(metricsPerEventName.getControlLoopSchemaType());
280 copyMetricsPerEventName.setPolicyScope(metricsPerEventName.getPolicyScope());
281 copyMetricsPerEventName.setPolicyName(metricsPerEventName.getPolicyName());
282 copyMetricsPerEventName.setPolicyVersion(metricsPerEventName.getPolicyVersion());
283 // no thresholds copied
284 return copyMetricsPerEventName;