Add support for ABATED alerts within CDAP TCA
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / openecomp / dcae / apod / analytics / cdap / tca / flowlet / TCAVESThresholdViolationCalculatorFlowlet.java
index b8460dc..a3a35cf 100644 (file)
 
 package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
 
+import co.cask.cdap.api.annotation.HashPartition;
 import co.cask.cdap.api.annotation.Output;
 import co.cask.cdap.api.annotation.ProcessInput;
 import co.cask.cdap.api.annotation.Property;
-import co.cask.cdap.api.annotation.RoundRobin;
 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
 import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
 import co.cask.cdap.api.flow.flowlet.FlowletContext;
@@ -32,10 +32,12 @@ import co.cask.cdap.api.metrics.Metrics;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCACalculatorMessageType;
 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAMessageStatusEntity;
 import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
 import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.TCAPolicy;
 import org.openecomp.dcae.apod.analytics.tca.processor.TCACEFProcessorContext;
 import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
@@ -54,7 +56,7 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet {
     private static final Logger LOG = LoggerFactory.getLogger(TCAVESThresholdViolationCalculatorFlowlet.class);
 
     @Output(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
-    protected OutputEmitter<String> tcaAlertOutputEmitter;
+    protected OutputEmitter<ThresholdCalculatorOutput> tcaAlertOutputEmitter;
     protected Metrics metrics;
     private ObjectMappedTable<TCAMessageStatusEntity> vesMessageStatusTable;
 
@@ -100,7 +102,7 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet {
      * @throws JsonProcessingException if alert message cannot be parsed into JSON object
      */
     @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_MESSAGE_ROUTER_OUTPUT)
-    @RoundRobin
+    @HashPartition(AnalyticsConstants.TCA_VES_MESSAGE_ROUTER_PARTITION_KEY)
     public void filterVESMessages(String vesMessage) throws JsonProcessingException {
 
         TCACalculatorMessageType calculatorMessageType = TCACalculatorMessageType.INAPPLICABLE;
@@ -127,7 +129,12 @@ public class TCAVESThresholdViolationCalculatorFlowlet extends AbstractFlowlet {
                 metrics.count(CDAPMetricsConstants.TCA_VES_NON_COMPLIANT_MESSAGES_METRIC, 1);
 
                 // Step 4: Emit message to Alert Sink Flowlet
-                tcaAlertOutputEmitter.emit(alertMessage);
+                final ThresholdCalculatorOutput thresholdCalculatorOutput =
+                        new ThresholdCalculatorOutput(processorContext.getMessage(),
+                                TCAUtils.writeValueAsString(processorContext.getTCAPolicy()),
+                                TCAUtils.writeValueAsString(processorContextWithViolations.getMetricsPerEventName()),
+                                alertMessage);
+                tcaAlertOutputEmitter.emit(thresholdCalculatorOutput);
 
             } else {