2  * ================================================================================
 
   3  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
 
   4  * ================================================================================
 
   5  * Licensed under the Apache License, Version 2.0 (the "License");
 
   6  * you may not use this file except in compliance with the License.
 
   7  * You may obtain a copy of the License at
 
   9  *      http://www.apache.org/licenses/LICENSE-2.0
 
  11  * Unless required by applicable law or agreed to in writing, software
 
  12  * distributed under the License is distributed on an "AS IS" BASIS,
 
  13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  * See the License for the specific language governing permissions and
 
  15  * limitations under the License.
 
  16  * ============LICENSE_END=========================================================
 
  20 package org.onap.dcae.analytics.tca.core.util.function.calculation;
 
  22 import com.jayway.jsonpath.DocumentContext;
 
  23 import com.jayway.jsonpath.JsonPath;
 
  24 import com.jayway.jsonpath.TypeRef;
 
  26 import java.math.BigDecimal;
 
  27 import java.util.Collections;
 
  28 import java.util.Comparator;
 
  29 import java.util.HashMap;
 
  30 import java.util.LinkedHashMap;
 
  31 import java.util.LinkedList;
 
  32 import java.util.List;
 
  34 import java.util.Objects;
 
  35 import java.util.Optional;
 
  37 import java.util.stream.Collectors;
 
  39 import javax.annotation.Nonnull;
 
  41 import org.onap.dcae.analytics.model.cef.CommonEventHeader;
 
  42 import org.onap.dcae.analytics.model.cef.Event;
 
  43 import org.onap.dcae.analytics.model.cef.EventListener;
 
  44 import org.onap.dcae.analytics.tca.core.exception.TcaProcessingException;
 
  45 import org.onap.dcae.analytics.tca.core.service.TcaExecutionContext;
 
  46 import org.onap.dcae.analytics.tca.core.service.TcaResultContext;
 
  47 import org.onap.dcae.analytics.tca.model.policy.MetricsPerEventName;
 
  48 import org.onap.dcae.analytics.tca.model.policy.TcaPolicy;
 
  49 import org.onap.dcae.analytics.tca.model.policy.Threshold;
 
  52  * @author Rajiv Singla
 
  54 public class TcaThresholdViolationCalculator implements TcaCalculationFunction {
 
  57     public TcaExecutionContext calculate(final TcaExecutionContext tcaExecutionContext) {
 
  59         final String cefMessage = tcaExecutionContext.getCefMessage();
 
  60         final EventListener eventListener = tcaExecutionContext.getTcaProcessingContext().getEventListener();
 
  61         final TcaPolicy tcaPolicy = tcaExecutionContext.getTcaPolicy();
 
  64         final String cefEventName = Optional.ofNullable(eventListener)
 
  65                 .map(EventListener::getEvent)
 
  66                 .map(Event::getCommonEventHeader)
 
  67                 .map(CommonEventHeader::getEventName)
 
  68                 .orElseThrow(() -> new TcaProcessingException("Required Field: EventName not present"));
 
  70         // Get Policy's metrics per event name matching CEF message event name
 
  71         final MetricsPerEventName policyMetricsPerEventName =
 
  72                 tcaPolicy.getMetricsPerEventName().stream()
 
  73                         .filter(m -> m.getEventName().equalsIgnoreCase(cefEventName))
 
  74                         .findFirst().orElseThrow(() ->
 
  75                         new TcaProcessingException("Required Field: MetricsPerEventName not present"));
 
  78         // get violated policy threshold for cef event name sorted by severity
 
  79         final Optional<Threshold> thresholdOptional =
 
  80                 getViolatedThreshold(policyMetricsPerEventName.getThresholds(), cefMessage);
 
  83         // Check if threshold violation is present
 
  84         if (!thresholdOptional.isPresent()) {
 
  85             final String earlyTerminationMessage = "No Policy Threshold violation detected in CEF Message";
 
  86             setTerminatingMessage(earlyTerminationMessage, tcaExecutionContext, false);
 
  87             return tcaExecutionContext;
 
  91         // Threshold violations are present - update tca processing result context
 
  92         final MetricsPerEventName violatedMetricsPerEventName = copyMetricsPerEventName(policyMetricsPerEventName);
 
  93         final Threshold violatedThreshold = thresholdOptional.get();
 
  94         violatedMetricsPerEventName.setThresholds(Collections.singletonList(violatedThreshold));
 
  95         final TcaResultContext tcaResultContext =
 
  96                 tcaExecutionContext.getTcaResultContext();
 
  97         tcaResultContext.setViolatedMetricsPerEventName(violatedMetricsPerEventName);
 
  99         return tcaExecutionContext;
 
 104      * Provides violated threshold
 
 106      * @param policyThresholds policy thresholds that need to applied to incoming cef message
 
 107      * @param cefMessage incoming cef message
 
 109      * @return list of all violated threshold
 
 111     private static Optional<Threshold> getViolatedThreshold(final List<Threshold> policyThresholds,
 
 112                                                             final String cefMessage) {
 
 114         // map containing key as field path and associated policy thresholds
 
 115         final Map<String, List<Threshold>> policyFieldPathsMap = new LinkedHashMap<>();
 
 116         for (final Threshold policyThreshold : policyThresholds) {
 
 117             if (policyFieldPathsMap.get(policyThreshold.getFieldPath()) == null) {
 
 118                 final LinkedList<Threshold> policyThresholdList = new LinkedList<>();
 
 119                 policyThresholdList.add(policyThreshold);
 
 120                 policyFieldPathsMap.put(policyThreshold.getFieldPath(), policyThresholdList);
 
 122                 policyFieldPathsMap.get(policyThreshold.getFieldPath()).add(policyThreshold);
 
 126         // get map containing key as field path and values for json path
 
 127         final Map<String, List<BigDecimal>> messageFieldValuesMap =
 
 128                 getJsonPathValues(cefMessage, policyFieldPathsMap.keySet());
 
 130         // if no matching path values - assuming no threshold violations
 
 131         if (messageFieldValuesMap.isEmpty()) {
 
 132             return Optional.empty();
 
 135         // Determine all violated thresholds per message field Path
 
 136         final Map<String, Threshold> violatedThresholdsMap = new HashMap<>();
 
 137         for (Map.Entry<String, List<BigDecimal>> messageFieldValuesMapEntry : messageFieldValuesMap.entrySet()) {
 
 138             final String messageFieldPath = messageFieldValuesMapEntry.getKey();
 
 139             final List<Threshold> messageFieldAssociatedPolicyThresholds = policyFieldPathsMap.get(messageFieldPath);
 
 140             if (messageFieldAssociatedPolicyThresholds != null) {
 
 141                 final Optional<Threshold> thresholdOptional = computeViolatedThreshold(
 
 142                         messageFieldValuesMapEntry.getValue(), messageFieldAssociatedPolicyThresholds);
 
 143                 thresholdOptional.ifPresent(threshold -> violatedThresholdsMap.put(messageFieldPath, threshold));
 
 147         // if multiple fields have violated threshold - grab the first one with max severity
 
 148         return violatedThresholdsMap.values().stream()
 
 149                 .sorted(Comparator.comparing(Threshold::getSeverity)).findFirst();
 
 154      * Computes if any CEF Message Fields have violated any Policy Thresholds. For the same policy field path
 
 155      * it applies threshold in order of their severity and then by direction and returns first violated threshold
 
 157      * @param messageFieldValues Field Path Values extracted from CEF Message
 
 158      * @param fieldThresholds Policy Thresholds for Field Path
 
 160      * @return Optional of violated threshold for a field path
 
 162     private static Optional<Threshold> computeViolatedThreshold(final List<BigDecimal> messageFieldValues,
 
 163                                                                 final List<Threshold> fieldThresholds) {
 
 165         // sort thresholds based on severity and then based on direction
 
 166         final List<Threshold> sortedPolicyThresholds = fieldThresholds.stream()
 
 167                 .sorted((t1, t2) -> {
 
 168                     if (t1.getSeverity().compareTo(t2.getSeverity()) != 0) {
 
 169                         return t1.getSeverity().compareTo(t2.getSeverity());
 
 171                         return t1.getDirection().compareTo(t2.getDirection());
 
 174                 .collect(Collectors.toList());
 
 176         // Now apply each threshold to field values
 
 177         for (final Threshold policyThreshold : sortedPolicyThresholds) {
 
 178             for (final BigDecimal messageFieldValue : messageFieldValues) {
 
 179                 final Boolean isThresholdViolated =
 
 180                         policyThreshold.getDirection().operate(messageFieldValue,
 
 181                                 new BigDecimal(policyThreshold.getThresholdValue()));
 
 182                 if (isThresholdViolated) {
 
 183                     final Threshold violatedThreshold = copyThreshold(policyThreshold);
 
 184                     violatedThreshold.setActualFieldValue(messageFieldValue);
 
 185                     return Optional.of(violatedThreshold);
 
 189         return Optional.empty();
 
 193      * Extracts json path values for given json Field Path from using Json path notation.
 
 195      * @param message CEF Message
 
 196      * @param jsonFieldPaths Json Field Paths
 
 198      * @return Map containing key as json path and values as values associated with that json path
 
 200     private static Map<String, List<BigDecimal>> getJsonPathValues(@Nonnull String message,
 
 201                                                                    @Nonnull Set<String> jsonFieldPaths) {
 
 203         final Map<String, List<BigDecimal>> jsonFieldPathMap = new HashMap<>();
 
 204         final DocumentContext documentContext = JsonPath.parse(message);
 
 206         for (String jsonFieldPath : jsonFieldPaths) {
 
 207             List<BigDecimal> jsonFieldValues;
 
 210                 jsonFieldValues = documentContext.read(jsonFieldPath, new TypeRef<List<BigDecimal>>() {
 
 212             } catch (Exception e) {
 
 213                 final String errorMessage = String.format(
 
 214                         "Unable to convert jsonFieldPath value to valid number." +
 
 215                                 "Json Path: %s.Incoming message: %s", jsonFieldPath, message);
 
 216                 throw new TcaProcessingException(errorMessage, e);
 
 218             // If Json Field Values are present
 
 219             if (jsonFieldValues != null && !jsonFieldValues.isEmpty()) {
 
 220                 // Filter out all null values in the filed values list
 
 221                 final List<BigDecimal> nonNullValues =
 
 222                         jsonFieldValues.stream().filter(Objects::nonNull).collect(Collectors.toList());
 
 223                 // If there are non null values put them in the map
 
 224                 if (!nonNullValues.isEmpty()) {
 
 225                     jsonFieldPathMap.put(jsonFieldPath, nonNullValues);
 
 230         return jsonFieldPathMap;
 
 234      * Creates a new threshold hold object by copying the value of given threshold object
 
 236      * @param threshold threshold that needs to be copied
 
 238      * @return new threshold object which is copy of given threshold object
 
 240     private static Threshold copyThreshold(final Threshold threshold) {
 
 241         final Threshold copyThreshold = new Threshold();
 
 242         copyThreshold.setClosedLoopControlName(threshold.getClosedLoopControlName());
 
 243         copyThreshold.setClosedLoopEventStatus(threshold.getClosedLoopEventStatus());
 
 244         copyThreshold.setVersion(threshold.getVersion());
 
 245         copyThreshold.setFieldPath(threshold.getFieldPath());
 
 246         copyThreshold.setThresholdValue(threshold.getThresholdValue());
 
 247         copyThreshold.setDirection(threshold.getDirection());
 
 248         copyThreshold.setSeverity(threshold.getSeverity());
 
 249         return copyThreshold;
 
 253      * Returns a copy of metric Per event name without copying thresholds
 
 255      * @param metricsPerEventName metric per event name that needs to be copied
 
 257      * @return new metric per event name object which is copy of given object
 
 259     private static MetricsPerEventName copyMetricsPerEventName(final MetricsPerEventName metricsPerEventName) {
 
 260         final MetricsPerEventName copyMetricsPerEventName = new MetricsPerEventName();
 
 261         copyMetricsPerEventName.setEventName(metricsPerEventName.getEventName());
 
 262         copyMetricsPerEventName.setControlLoopSchemaType(metricsPerEventName.getControlLoopSchemaType());
 
 263         copyMetricsPerEventName.setPolicyScope(metricsPerEventName.getPolicyScope());
 
 264         copyMetricsPerEventName.setPolicyName(metricsPerEventName.getPolicyName());
 
 265         copyMetricsPerEventName.setPolicyVersion(metricsPerEventName.getPolicyVersion());
 
 266         // no thresholds copied
 
 267         return copyMetricsPerEventName;