Standalone TCA with EELF Logger
[dcaegen2/analytics/tca-gen2.git] / dcae-analytics / dcae-analytics-tca-core / src / main / java / org / onap / dcae / analytics / tca / core / util / function / calculation / TcaThresholdViolationCalculator.java
1 /*
2  * ================================================================================
3  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  *
18  */
19
20 package org.onap.dcae.analytics.tca.core.util.function.calculation;
21
22 import com.jayway.jsonpath.DocumentContext;
23 import com.jayway.jsonpath.JsonPath;
24 import com.jayway.jsonpath.TypeRef;
25
26 import java.math.BigDecimal;
27 import java.util.Collections;
28 import java.util.Comparator;
29 import java.util.HashMap;
30 import java.util.LinkedHashMap;
31 import java.util.LinkedList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Objects;
35 import java.util.Optional;
36 import java.util.Set;
37 import java.util.stream.Collectors;
38
39 import javax.annotation.Nonnull;
40
41 import org.onap.dcae.analytics.model.cef.CommonEventHeader;
42 import org.onap.dcae.analytics.model.cef.Event;
43 import org.onap.dcae.analytics.model.cef.EventListener;
44 import org.onap.dcae.analytics.tca.core.exception.TcaProcessingException;
45 import org.onap.dcae.analytics.tca.core.service.TcaExecutionContext;
46 import org.onap.dcae.analytics.tca.core.service.TcaResultContext;
47 import org.onap.dcae.analytics.tca.model.policy.MetricsPerEventName;
48 import org.onap.dcae.analytics.tca.model.policy.TcaPolicy;
49 import org.onap.dcae.analytics.tca.model.policy.Threshold;
50
51 /**
52  * @author Rajiv Singla
53  */
54 public class TcaThresholdViolationCalculator implements TcaCalculationFunction {
55
56     @Override
57     public TcaExecutionContext calculate(final TcaExecutionContext tcaExecutionContext) {
58
59         final String cefMessage = tcaExecutionContext.getCefMessage();
60         final EventListener eventListener = tcaExecutionContext.getTcaProcessingContext().getEventListener();
61         final TcaPolicy tcaPolicy = tcaExecutionContext.getTcaPolicy();
62
63         // Get CEF Event Name
64         final String cefEventName = Optional.ofNullable(eventListener)
65                 .map(EventListener::getEvent)
66                 .map(Event::getCommonEventHeader)
67                 .map(CommonEventHeader::getEventName)
68                 .orElseThrow(() -> new TcaProcessingException("Required Field: EventName not present"));
69
70         // Get Policy's metrics per event name matching CEF message event name
71         final MetricsPerEventName policyMetricsPerEventName =
72                 tcaPolicy.getMetricsPerEventName().stream()
73                         .filter(m -> m.getEventName().equalsIgnoreCase(cefEventName))
74                         .findFirst().orElseThrow(() ->
75                         new TcaProcessingException("Required Field: MetricsPerEventName not present"));
76
77
78         // get violated policy threshold for cef event name sorted by severity
79         final Optional<Threshold> thresholdOptional =
80                 getViolatedThreshold(policyMetricsPerEventName.getThresholds(), cefMessage);
81
82
83         // Check if threshold violation is present
84         if (!thresholdOptional.isPresent()) {
85             final String earlyTerminationMessage = "No Policy Threshold violation detected in CEF Message";
86             setTerminatingMessage(earlyTerminationMessage, tcaExecutionContext, false);
87             return tcaExecutionContext;
88         }
89
90
91         // Threshold violations are present - update tca processing result context
92         final MetricsPerEventName violatedMetricsPerEventName = copyMetricsPerEventName(policyMetricsPerEventName);
93         final Threshold violatedThreshold = thresholdOptional.get();
94         violatedMetricsPerEventName.setThresholds(Collections.singletonList(violatedThreshold));
95         final TcaResultContext tcaResultContext =
96                 tcaExecutionContext.getTcaResultContext();
97         tcaResultContext.setViolatedMetricsPerEventName(violatedMetricsPerEventName);
98
99         return tcaExecutionContext;
100     }
101
102
103     /**
104      * Provides violated threshold
105      *
106      * @param policyThresholds policy thresholds that need to applied to incoming cef message
107      * @param cefMessage incoming cef message
108      *
109      * @return list of all violated threshold
110      */
111     private static Optional<Threshold> getViolatedThreshold(final List<Threshold> policyThresholds,
112                                                             final String cefMessage) {
113
114         // map containing key as field path and associated policy thresholds
115         final Map<String, List<Threshold>> policyFieldPathsMap = new LinkedHashMap<>();
116         for (final Threshold policyThreshold : policyThresholds) {
117             if (policyFieldPathsMap.get(policyThreshold.getFieldPath()) == null) {
118                 final LinkedList<Threshold> policyThresholdList = new LinkedList<>();
119                 policyThresholdList.add(policyThreshold);
120                 policyFieldPathsMap.put(policyThreshold.getFieldPath(), policyThresholdList);
121             } else {
122                 policyFieldPathsMap.get(policyThreshold.getFieldPath()).add(policyThreshold);
123             }
124         }
125
126         // get map containing key as field path and values for json path
127         final Map<String, List<BigDecimal>> messageFieldValuesMap =
128                 getJsonPathValues(cefMessage, policyFieldPathsMap.keySet());
129
130         // if no matching path values - assuming no threshold violations
131         if (messageFieldValuesMap.isEmpty()) {
132             return Optional.empty();
133         }
134
135         // Determine all violated thresholds per message field Path
136         final Map<String, Threshold> violatedThresholdsMap = new HashMap<>();
137         for (Map.Entry<String, List<BigDecimal>> messageFieldValuesMapEntry : messageFieldValuesMap.entrySet()) {
138             final String messageFieldPath = messageFieldValuesMapEntry.getKey();
139             final List<Threshold> messageFieldAssociatedPolicyThresholds = policyFieldPathsMap.get(messageFieldPath);
140             if (messageFieldAssociatedPolicyThresholds != null) {
141                 final Optional<Threshold> thresholdOptional = computeViolatedThreshold(
142                         messageFieldValuesMapEntry.getValue(), messageFieldAssociatedPolicyThresholds);
143                 thresholdOptional.ifPresent(threshold -> violatedThresholdsMap.put(messageFieldPath, threshold));
144             }
145         }
146
147         // if multiple fields have violated threshold - grab the first one with max severity
148         return violatedThresholdsMap.values().stream()
149                 .sorted(Comparator.comparing(Threshold::getSeverity)).findFirst();
150
151     }
152
153     /**
154      * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
155      * it applies threshold in order of their severity and then by direction and returns first violated threshold
156      *
157      * @param messageFieldValues Field Path Values extracted from CEF Message
158      * @param fieldThresholds Policy Thresholds for Field Path
159      *
160      * @return Optional of violated threshold for a field path
161      */
162     private static Optional<Threshold> computeViolatedThreshold(final List<BigDecimal> messageFieldValues,
163                                                                 final List<Threshold> fieldThresholds) {
164
165         // sort thresholds based on severity and then based on direction
166         final List<Threshold> sortedPolicyThresholds = fieldThresholds.stream()
167                 .sorted((t1, t2) -> {
168                     if (t1.getSeverity().compareTo(t2.getSeverity()) != 0) {
169                         return t1.getSeverity().compareTo(t2.getSeverity());
170                     } else {
171                         return t1.getDirection().compareTo(t2.getDirection());
172                     }
173                 })
174                 .collect(Collectors.toList());
175
176         // Now apply each threshold to field values
177         for (final Threshold policyThreshold : sortedPolicyThresholds) {
178             for (final BigDecimal messageFieldValue : messageFieldValues) {
179                 final Boolean isThresholdViolated =
180                         policyThreshold.getDirection().operate(messageFieldValue,
181                                 new BigDecimal(policyThreshold.getThresholdValue()));
182                 if (isThresholdViolated) {
183                     final Threshold violatedThreshold = copyThreshold(policyThreshold);
184                     violatedThreshold.setActualFieldValue(messageFieldValue);
185                     return Optional.of(violatedThreshold);
186                 }
187             }
188         }
189         return Optional.empty();
190     }
191
192     /**
193      * Extracts json path values for given json Field Path from using Json path notation.
194      *
195      * @param message CEF Message
196      * @param jsonFieldPaths Json Field Paths
197      *
198      * @return Map containing key as json path and values as values associated with that json path
199      */
200     private static Map<String, List<BigDecimal>> getJsonPathValues(@Nonnull String message,
201                                                                    @Nonnull Set<String> jsonFieldPaths) {
202
203         final Map<String, List<BigDecimal>> jsonFieldPathMap = new HashMap<>();
204         final DocumentContext documentContext = JsonPath.parse(message);
205
206         for (String jsonFieldPath : jsonFieldPaths) {
207             List<BigDecimal> jsonFieldValues;
208
209             try {
210                 jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<BigDecimal>>() {
211                 });
212             } catch (Exception e) {
213                 final String errorMessage = String.format(
214                         "Unable to convert jsonFieldPath value to valid number." +
215                                 "Json Path: %s.Incoming message: %s", jsonFieldPath, message);
216                 throw new TcaProcessingException(errorMessage, e);
217             }
218             // If Json Field Values are present
219             if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
220                 // Filter out all null values in the filed values list
221                 final List<BigDecimal> nonNullValues =
222                         jsonFieldValues.stream().filter(Objects::nonNull).collect(Collectors.toList());
223                 // If there are non null values put them in the map
224                 if (!nonNullValues.isEmpty()) {
225                     jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
226                 }
227             }
228         }
229
230         return jsonFieldPathMap;
231     }
232
233     /**
234      * Creates a new threshold hold object by copying the value of given threshold object
235      *
236      * @param threshold threshold that needs to be copied
237      *
238      * @return new threshold object which is copy of given threshold object
239      */
240     private static Threshold copyThreshold(final Threshold threshold) {
241         final Threshold copyThreshold = new Threshold();
242         copyThreshold.setClosedLoopControlName(threshold.getClosedLoopControlName());
243         copyThreshold.setClosedLoopEventStatus(threshold.getClosedLoopEventStatus());
244         copyThreshold.setVersion(threshold.getVersion());
245         copyThreshold.setFieldPath(threshold.getFieldPath());
246         copyThreshold.setThresholdValue(threshold.getThresholdValue());
247         copyThreshold.setDirection(threshold.getDirection());
248         copyThreshold.setSeverity(threshold.getSeverity());
249         return copyThreshold;
250     }
251
252     /**
253      * Returns a copy of metric Per event name without copying thresholds
254      *
255      * @param metricsPerEventName metric per event name that needs to be copied
256      *
257      * @return new metric per event name object which is copy of given object
258      */
259     private static MetricsPerEventName copyMetricsPerEventName(final MetricsPerEventName metricsPerEventName) {
260         final MetricsPerEventName copyMetricsPerEventName = new MetricsPerEventName();
261         copyMetricsPerEventName.setEventName(metricsPerEventName.getEventName());
262         copyMetricsPerEventName.setControlLoopSchemaType(metricsPerEventName.getControlLoopSchemaType());
263         copyMetricsPerEventName.setPolicyScope(metricsPerEventName.getPolicyScope());
264         copyMetricsPerEventName.setPolicyName(metricsPerEventName.getPolicyName());
265         copyMetricsPerEventName.setPolicyVersion(metricsPerEventName.getPolicyVersion());
266         // no thresholds copied
267         return copyMetricsPerEventName;
268     }
269
270 }