TCA: Support for VES/A&AI enrichment
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / test / java / org / openecomp / dcae / apod / analytics / cdap / tca / flowlet / TCAVESAlertsAbatementFlowletTest.java
index baf42cc..e0c6d06 100644 (file)
-package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
-
-import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
-import co.cask.cdap.api.flow.flowlet.FlowletContext;
-import co.cask.cdap.api.flow.flowlet.OutputEmitter;
-import com.google.common.collect.ImmutableList;
-import org.apache.commons.lang3.tuple.Pair;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
-import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
-import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
-import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;
-import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;
-import org.openecomp.dcae.apod.analytics.cdap.tca.BaseAnalyticsCDAPTCAUnitTest;
-import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences;
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus;
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
-import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
-
-import java.util.Date;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Author: rs153v (Rajiv Singla) . Creation Date: 9/12/2017.
- */
-public class TCAVESAlertsAbatementFlowletTest extends BaseAnalyticsCDAPTCAUnitTest {
-
-    private static final TCAPolicyPreferences sampleTCAPolicyPreferences = getSampleTCAPolicyPreferences();
-    private static final List<MetricsPerEventName> metricsPerEventNames = sampleTCAPolicyPreferences
-            .getMetricsPerEventName();
-    private final OutputEmitter<String> mockOutputEmitter = mock(OutputEmitter.class);
-
-    private class TestTCAVESAlertsAbatementFlowlet extends TCAVESAlertsAbatementFlowlet {
-
-        public TestTCAVESAlertsAbatementFlowlet(String tcaAlertsAbatementTableName) {
-            super(tcaAlertsAbatementTableName);
-            this.alertsAbatementOutputEmitter = mockOutputEmitter;
-            doNothing().when(mockOutputEmitter).emit(any(String.class));
-        }
-    }
-
-    @Test
-    public void testConfigure() throws Exception {
-        final TCAVESAlertsAbatementFlowlet tcavesAlertsAbatementFlowlet =
-                new TCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");
-        assertFlowletNameAndDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET,
-                CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET,
-                tcavesAlertsAbatementFlowlet);
-    }
-
-    @Test(expected = CDAPSettingsException.class)
-    public void testDetermineAbatementAlertsWhenViolatedMetricsEventNameIsBlank() throws Exception {
-
-        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =
-                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");
-        final Threshold violatedThreshold = getViolatedThreshold(ControlLoopEventStatus.ONSET);
-        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =
-                getMockThresholdCalculatorOutput(violatedThreshold);
-        when(mockThresholdCalculatorOutput.getViolatedMetricsPerEventName()).thenReturn("");
-
-        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);
-    }
-
-    @Test
-    public void testDetermineAbatementAlertsWhenControlLoopTypeIsONSET() throws Exception {
-
-        final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName";
-        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =
-                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");
-
-        final FlowletContext mockFlowletContext = mock(FlowletContext.class);
-        final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);
-        when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);
-        tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);
-
-        doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));
-
-        final Threshold violatedThreshold = getViolatedThreshold(ControlLoopEventStatus.ONSET);
-        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =
-                getMockThresholdCalculatorOutput(violatedThreshold);
-
-        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);
-        verify(mockObjectMappedTable,
-                times(1)).write(any(String.class), any(TCAAlertsAbatementEntity.class));
-        verify(mockOutputEmitter, times(1)).emit(any(String.class));
-
-    }
-
-
-    @Test
-    public void testDetermineAbatementAlertsWhenControlLoopTypeIsABATEDAndNoPreviousAlertWasSent() throws Exception {
-
-        final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName";
-        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =
-                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");
-
-        final FlowletContext mockFlowletContext = mock(FlowletContext.class);
-        final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);
-        when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);
-        tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);
-
-        doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));
-        final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = mock(TCAAlertsAbatementEntity.class);
-        when(mockObjectMappedTable.read(any(String.class))).thenReturn(tcaAlertsAbatementEntity);
-        when(tcaAlertsAbatementEntity.getAbatementSentTS()).thenReturn(null);
-
-        final Threshold violatedThreshold = getViolatedThreshold(ControlLoopEventStatus.ABATED);
-        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =
-                getMockThresholdCalculatorOutput(violatedThreshold);
-
-        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);
-        verify(mockObjectMappedTable,
-                times(1)).write(any(String.class), any(TCAAlertsAbatementEntity.class));
-        verify(mockOutputEmitter, times(1)).emit(any(String.class));
-
-    }
-
-    @Test
-    public void testDetermineAbatementAlertsWhenControlLoopTypeIsABATEDAndPreviousAlertWasAlreadySent() throws
-            Exception {
-
-        final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName";
-        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =
-                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");
-
-        final FlowletContext mockFlowletContext = mock(FlowletContext.class);
-        final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);
-        when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);
-        tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);
-
-        doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));
-        final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = mock(TCAAlertsAbatementEntity.class);
-        when(mockObjectMappedTable.read(any(String.class))).thenReturn(tcaAlertsAbatementEntity);
-        final long time = new Date().getTime();
-        when(tcaAlertsAbatementEntity.getAbatementSentTS()).thenReturn(Long.toString(time));
-
-        final Threshold violatedThreshold = getViolatedThreshold(ControlLoopEventStatus.ABATED);
-        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =
-                getMockThresholdCalculatorOutput(violatedThreshold);
-
-        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);
-        verify(mockObjectMappedTable,
-                times(0)).write(any(String.class), any(TCAAlertsAbatementEntity.class));
-        verify(mockOutputEmitter, times(0)).emit(any(String.class));
-
-    }
-
-
-    @Test
-    public void testDetermineAbatementAlertsWhenControlLoopTypeIsABATEDAndNoPreviousONSETEventFound() throws
-            Exception {
-
-        final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName";
-        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =
-                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");
-
-        final FlowletContext mockFlowletContext = mock(FlowletContext.class);
-        final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);
-        when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);
-        tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);
-
-        doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));
-        when(mockObjectMappedTable.read(any(String.class))).thenReturn(null);
-
-        final Threshold violatedThreshold = getViolatedThreshold(ControlLoopEventStatus.ABATED);
-        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =
-                getMockThresholdCalculatorOutput(violatedThreshold);
-
-        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);
-        verify(mockObjectMappedTable,
-                times(0)).write(any(String.class), any(TCAAlertsAbatementEntity.class));
-        verify(mockOutputEmitter, times(0)).emit(any(String.class));
-
-    }
-
-    @Test(expected = CDAPSettingsException.class)
-    public void testDetermineAbatementAlertsWhenControlLoopTypeIsNotOnsetOrAbated() throws
-            Exception {
-        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =
-                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");
-        final Threshold violatedThreshold = getViolatedThreshold(ControlLoopEventStatus.CONTINUE);
-        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =
-                getMockThresholdCalculatorOutput(violatedThreshold);
-
-        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);
-
-    }
-
-    private static Threshold getViolatedThreshold(final ControlLoopEventStatus controlLoopEventStatus) {
-        final Threshold violatedThreshold = Threshold.copy(metricsPerEventNames.get(0).getThresholds().get(0));
-        violatedThreshold.setClosedLoopEventStatus(controlLoopEventStatus);
-        return violatedThreshold;
-    }
-
-
-    private static ThresholdCalculatorOutput getMockThresholdCalculatorOutput(final Threshold violatedThreshold) throws
-            Exception {
-
-        final MetricsPerEventName violatedMetricsPerEventName =
-                MetricsPerEventName.copy(metricsPerEventNames.get(0));
-        violatedMetricsPerEventName.setThresholds(ImmutableList.of(violatedThreshold));
-        return getMockThresholdCalculatorOutput(
-                fromStream(CEF_MESSAGE_JSON_FILE_LOCATION),
-                fromStream(TCA_POLICY_JSON_FILE_LOCATION),
-                TCAUtils.writeValueAsString(violatedMetricsPerEventName),
-                fromStream(TCA_ALERT_JSON_FILE_LOCATION)
-        );
-    }
-
-
-    private static ThresholdCalculatorOutput getMockThresholdCalculatorOutput(final String cefMessage,
-                                                                              final String tcaPolicy,
-                                                                              final String violatedMetricsPerEventName,
-                                                                              final String alertMessage) {
-        final ThresholdCalculatorOutput thresholdCalculatorOutput = mock(ThresholdCalculatorOutput.class);
-        when(thresholdCalculatorOutput.getCefMessage()).thenReturn(cefMessage);
-        when(thresholdCalculatorOutput.getTcaPolicy()).thenReturn(tcaPolicy);
-        when(thresholdCalculatorOutput.getViolatedMetricsPerEventName()).thenReturn(violatedMetricsPerEventName);
-        when(thresholdCalculatorOutput.getAlertMessage()).thenReturn(alertMessage);
-        return thresholdCalculatorOutput;
-    }
-
-}
+/*\r
+ * ===============================LICENSE_START======================================\r
+ *  dcae-analytics\r
+ * ================================================================================\r
+ *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ *  Licensed under the Apache License, Version 2.0 (the "License");\r
+ *  you may not use this file except in compliance with the License.\r
+ *   You may obtain a copy of the License at\r
+ *\r
+ *          http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ *  Unless required by applicable law or agreed to in writing, software\r
+ *  distributed under the License is distributed on an "AS IS" BASIS,\r
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ *  See the License for the specific language governing permissions and\r
+ *  limitations under the License.\r
+ *  ============================LICENSE_END===========================================\r
+ */\r
+\r
+package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;\r
+\r
+import co.cask.cdap.api.dataset.lib.ObjectMappedTable;\r
+import co.cask.cdap.api.flow.flowlet.FlowletContext;\r
+import co.cask.cdap.api.flow.flowlet.OutputEmitter;\r
+import com.google.common.collect.ImmutableList;\r
+import org.junit.Test;\r
+import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;\r
+import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;\r
+import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;\r
+import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;\r
+import org.openecomp.dcae.apod.analytics.cdap.tca.BaseAnalyticsCDAPTCAUnitTest;\r
+import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAPolicyPreferences;\r
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;\r
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;\r
+import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;\r
+import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;\r
+\r
+import java.util.Date;\r
+import java.util.List;\r
+\r
+import static org.mockito.ArgumentMatchers.any;\r
+import static org.mockito.ArgumentMatchers.eq;\r
+import static org.mockito.Mockito.doNothing;\r
+import static org.mockito.Mockito.mock;\r
+import static org.mockito.Mockito.times;\r
+import static org.mockito.Mockito.verify;\r
+import static org.mockito.Mockito.when;\r
+\r
+/**\r
+ * @author Rajiv Singla . Creation Date: 9/12/2017.\r
+ */\r
+@SuppressWarnings("unchecked")\r
+public class TCAVESAlertsAbatementFlowletTest extends BaseAnalyticsCDAPTCAUnitTest {\r
+\r
+    private static final TCAPolicyPreferences sampleTCAPolicyPreferences = getSampleTCAPolicyPreferences();\r
+    private static final List<MetricsPerEventName> metricsPerEventNames = sampleTCAPolicyPreferences\r
+            .getMetricsPerEventName();\r
+    private final OutputEmitter<String> mockOutputEmitter = mock(OutputEmitter.class);\r
+\r
+    private class TestTCAVESAlertsAbatementFlowlet extends TCAVESAlertsAbatementFlowlet {\r
+\r
+        public TestTCAVESAlertsAbatementFlowlet(String tcaAlertsAbatementTableName) {\r
+            super(tcaAlertsAbatementTableName);\r
+            this.alertsAbatementOutputEmitter = mockOutputEmitter;\r
+            doNothing().when(mockOutputEmitter).emit(any(String.class));\r
+        }\r
+    }\r
+\r
+    @Test\r
+    public void testConfigure() throws Exception {\r
+        final TCAVESAlertsAbatementFlowlet tcavesAlertsAbatementFlowlet =\r
+                new TCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");\r
+        assertFlowletNameAndDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET,\r
+                CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET,\r
+                tcavesAlertsAbatementFlowlet);\r
+    }\r
+\r
+    @Test(expected = CDAPSettingsException.class)\r
+    public void testDetermineAbatementAlertsWhenViolatedMetricsEventNameIsBlank() throws Exception {\r
+\r
+        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =\r
+                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");\r
+        final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.ONSET);\r
+        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =\r
+                getMockThresholdCalculatorOutput(violatedThreshold);\r
+        when(mockThresholdCalculatorOutput.getViolatedMetricsPerEventName()).thenReturn("");\r
+\r
+        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);\r
+    }\r
+\r
+    @Test\r
+    public void testDetermineAbatementAlertsWhenControlLoopTypeIsONSET() throws Exception {\r
+\r
+        final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName";\r
+        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =\r
+                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");\r
+\r
+        final FlowletContext mockFlowletContext = mock(FlowletContext.class);\r
+        final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);\r
+        when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);\r
+        tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);\r
+\r
+        doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));\r
+\r
+        final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.ONSET);\r
+        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =\r
+                getMockThresholdCalculatorOutput(violatedThreshold);\r
+\r
+        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);\r
+        verify(mockObjectMappedTable,\r
+                times(1)).write(any(String.class), any(TCAAlertsAbatementEntity.class));\r
+        verify(mockOutputEmitter, times(1)).emit(any(String.class));\r
+\r
+    }\r
+\r
+\r
+    @Test\r
+    public void testDetermineAbatementAlertsWhenControlLoopTypeIsABATEDAndNoPreviousAlertWasSent() throws Exception {\r
+\r
+        final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName";\r
+        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =\r
+                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");\r
+\r
+        final FlowletContext mockFlowletContext = mock(FlowletContext.class);\r
+        final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);\r
+        when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);\r
+        tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);\r
+\r
+        doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));\r
+        final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = mock(TCAAlertsAbatementEntity.class);\r
+        when(mockObjectMappedTable.read(any(String.class))).thenReturn(tcaAlertsAbatementEntity);\r
+        when(tcaAlertsAbatementEntity.getAbatementSentTS()).thenReturn(null);\r
+\r
+        final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.ABATED);\r
+        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =\r
+                getMockThresholdCalculatorOutput(violatedThreshold);\r
+\r
+        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);\r
+        verify(mockObjectMappedTable,\r
+                times(1)).write(any(String.class), any(TCAAlertsAbatementEntity.class));\r
+        verify(mockOutputEmitter, times(1)).emit(any(String.class));\r
+\r
+    }\r
+\r
+    @Test\r
+    public void testDetermineAbatementAlertsWhenControlLoopTypeIsABATEDAndPreviousAlertWasAlreadySent() throws\r
+            Exception {\r
+\r
+        final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName";\r
+        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =\r
+                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");\r
+\r
+        final FlowletContext mockFlowletContext = mock(FlowletContext.class);\r
+        final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);\r
+        when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);\r
+        tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);\r
+\r
+        doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));\r
+        final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = mock(TCAAlertsAbatementEntity.class);\r
+        when(mockObjectMappedTable.read(any(String.class))).thenReturn(tcaAlertsAbatementEntity);\r
+        final long time = new Date().getTime();\r
+        when(tcaAlertsAbatementEntity.getAbatementSentTS()).thenReturn(Long.toString(time));\r
+\r
+        final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.ABATED);\r
+        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =\r
+                getMockThresholdCalculatorOutput(violatedThreshold);\r
+\r
+        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);\r
+        verify(mockObjectMappedTable,\r
+                times(0)).write(any(String.class), any(TCAAlertsAbatementEntity.class));\r
+        verify(mockOutputEmitter, times(0)).emit(any(String.class));\r
+\r
+    }\r
+\r
+\r
+    @Test\r
+    public void testDetermineAbatementAlertsWhenControlLoopTypeIsABATEDAndNoPreviousONSETEventFound() throws\r
+            Exception {\r
+\r
+        final String testTCAAlertsAbatementTableName = "testTCAAlertsAbatementTableName";\r
+        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =\r
+                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");\r
+\r
+        final FlowletContext mockFlowletContext = mock(FlowletContext.class);\r
+        final ObjectMappedTable<TCAAlertsAbatementEntity> mockObjectMappedTable = mock(ObjectMappedTable.class);\r
+        when(mockFlowletContext.getDataset(eq(testTCAAlertsAbatementTableName))).thenReturn(mockObjectMappedTable);\r
+        tcaAlertsAbatementFlowlet.initialize(mockFlowletContext);\r
+\r
+        doNothing().when(mockObjectMappedTable).write(any(String.class), any(TCAAlertsAbatementEntity.class));\r
+        when(mockObjectMappedTable.read(any(String.class))).thenReturn(null);\r
+\r
+        final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.ABATED);\r
+        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =\r
+                getMockThresholdCalculatorOutput(violatedThreshold);\r
+\r
+        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);\r
+        verify(mockObjectMappedTable,\r
+                times(0)).write(any(String.class), any(TCAAlertsAbatementEntity.class));\r
+        verify(mockOutputEmitter, times(0)).emit(any(String.class));\r
+\r
+    }\r
+\r
+    @Test(expected = CDAPSettingsException.class)\r
+    public void testDetermineAbatementAlertsWhenControlLoopTypeIsNotOnsetOrAbated() throws\r
+            Exception {\r
+        final TestTCAVESAlertsAbatementFlowlet tcaAlertsAbatementFlowlet =\r
+                new TestTCAVESAlertsAbatementFlowlet("testTCAAlertsAbatementTableName");\r
+        final Threshold violatedThreshold = getViolatedThreshold(ClosedLoopEventStatus.CONTINUE);\r
+        final ThresholdCalculatorOutput mockThresholdCalculatorOutput =\r
+                getMockThresholdCalculatorOutput(violatedThreshold);\r
+\r
+        tcaAlertsAbatementFlowlet.determineAbatementAlerts(mockThresholdCalculatorOutput);\r
+\r
+    }\r
+\r
+    private static Threshold getViolatedThreshold(final ClosedLoopEventStatus closedLoopEventStatus) {\r
+        final Threshold violatedThreshold = Threshold.copy(metricsPerEventNames.get(0).getThresholds().get(0));\r
+        violatedThreshold.setClosedLoopEventStatus(closedLoopEventStatus);\r
+        return violatedThreshold;\r
+    }\r
+\r
+\r
+    private static ThresholdCalculatorOutput getMockThresholdCalculatorOutput(final Threshold violatedThreshold) throws\r
+            Exception {\r
+\r
+        final MetricsPerEventName violatedMetricsPerEventName =\r
+                MetricsPerEventName.copy(metricsPerEventNames.get(0));\r
+        violatedMetricsPerEventName.setThresholds(ImmutableList.of(violatedThreshold));\r
+        return getMockThresholdCalculatorOutput(\r
+                fromStream(CEF_MESSAGE_JSON_FILE_LOCATION),\r
+                fromStream(TCA_POLICY_JSON_FILE_LOCATION),\r
+                TCAUtils.writeValueAsString(violatedMetricsPerEventName),\r
+                fromStream(TCA_ALERT_JSON_FILE_LOCATION)\r
+        );\r
+    }\r
+\r
+\r
+    private static ThresholdCalculatorOutput getMockThresholdCalculatorOutput(final String cefMessage,\r
+                                                                              final String tcaPolicy,\r
+                                                                              final String violatedMetricsPerEventName,\r
+                                                                              final String alertMessage) {\r
+        final ThresholdCalculatorOutput thresholdCalculatorOutput = mock(ThresholdCalculatorOutput.class);\r
+        when(thresholdCalculatorOutput.getCefMessage()).thenReturn(cefMessage);\r
+        when(thresholdCalculatorOutput.getTcaPolicy()).thenReturn(tcaPolicy);\r
+        when(thresholdCalculatorOutput.getViolatedMetricsPerEventName()).thenReturn(violatedMetricsPerEventName);\r
+        when(thresholdCalculatorOutput.getAlertMessage()).thenReturn(alertMessage);\r
+        return thresholdCalculatorOutput;\r
+    }\r
+\r
+}\r