Add support for ABATED alerts within CDAP TCA
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / openecomp / dcae / apod / analytics / cdap / tca / flow / TCAVESCollectorFlow.java
index d880a12..4df35e7 100644 (file)
@@ -22,6 +22,7 @@ package org.openecomp.dcae.apod.analytics.cdap.tca.flow;
 
 import co.cask.cdap.api.flow.AbstractFlow;
 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsAbatementFlowlet;
 import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESAlertsSinkFlowlet;
 import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESMessageRouterFlowlet;
 import org.openecomp.dcae.apod.analytics.cdap.tca.flowlet.TCAVESThresholdViolationCalculatorFlowlet;
@@ -53,6 +54,10 @@ public class TCAVESCollectorFlow extends AbstractFlow {
                 new TCAVESThresholdViolationCalculatorFlowlet(tcaAppConfig.getTcaVESMessageStatusTableName());
         addFlowlet(thresholdViolationCalculatorFlowlet, tcaAppConfig.getThresholdCalculatorFlowletInstances());
 
+        final TCAVESAlertsAbatementFlowlet tcavesAlertsAbatementFlowlet =
+                new TCAVESAlertsAbatementFlowlet(tcaAppConfig.getTcaAlertsAbatementTableName());
+        addFlowlet(tcavesAlertsAbatementFlowlet);
+
         final TCAVESAlertsSinkFlowlet alertsSinkFlowlet =
                 new TCAVESAlertsSinkFlowlet(tcaAppConfig.getTcaVESAlertsTableName());
         addFlowlet(alertsSinkFlowlet);
@@ -62,8 +67,10 @@ public class TCAVESCollectorFlow extends AbstractFlow {
         connectStream(tcaAppConfig.getTcaSubscriberOutputStreamName(), messageRouterFlowlet);
         // connect message router to VES threshold calculator
         connect(messageRouterFlowlet, thresholdViolationCalculatorFlowlet);
-        // connect VES threshold calculator flowlet to Alerts Sink Flowlet
-        connect(thresholdViolationCalculatorFlowlet, alertsSinkFlowlet);
+        // connect VES threshold calculator flowlet to Alerts Abatement Flowlet
+        connect(thresholdViolationCalculatorFlowlet, tcavesAlertsAbatementFlowlet);
+        // connect Alerts Abatement flowlet to Alerts Sink Flowlet
+        connect(tcavesAlertsAbatementFlowlet, alertsSinkFlowlet);
 
     }
 }