Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / test / java / org / openecomp / dcae / apod / analytics / cdap / tca / worker / TCADMaaPMRSubscriberJobTest.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
22
23 import co.cask.cdap.api.metrics.Metrics;
24 import co.cask.cdap.api.worker.WorkerContext;
25 import com.google.common.collect.ImmutableList;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.mockito.Mockito;
29 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
30 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
31 import org.openecomp.dcae.apod.analytics.cdap.tca.BaseAnalyticsCDAPTCAUnitTest;
32 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
33 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
34 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
35 import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
36 import org.quartz.JobDataMap;
37 import org.quartz.JobExecutionContext;
38
39 import java.io.IOException;
40 import java.util.Arrays;
41 import java.util.Collections;
42
43 import static org.mockito.ArgumentMatchers.anyInt;
44 import static org.mockito.ArgumentMatchers.anyString;
45 import static org.mockito.ArgumentMatchers.eq;
46 import static org.mockito.Mockito.doNothing;
47 import static org.mockito.Mockito.doThrow;
48 import static org.mockito.Mockito.mock;
49 import static org.mockito.Mockito.verify;
50 import static org.mockito.Mockito.when;
51
52 /**
53  * @author Rajiv Singla . Creation Date: 12/20/2016.
54  */
55 public class TCADMaaPMRSubscriberJobTest extends BaseAnalyticsCDAPTCAUnitTest {
56
57     private JobExecutionContext jobExecutionContext;
58     private TCADMaaPMRSubscriberJob subscriberJob;
59     private JobDataMap jobDataMap;
60     private WorkerContext workerContext;
61     private DMaaPMRSubscriber subscriber;
62     private Metrics metrics;
63
64
65     @Before
66     public void before() throws Exception {
67
68         jobExecutionContext = mock(JobExecutionContext.class);
69         workerContext = mock(WorkerContext.class);
70
71         metrics = mock(Metrics.class);
72         doNothing().when(metrics).count(anyString(), anyInt());
73         subscriber = mock(DMaaPMRSubscriber.class);
74
75         jobDataMap = mock(JobDataMap.class);
76         when(jobDataMap.getString(eq(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME))).thenReturn
77                 (CDAPComponentsConstants.TCA_DEFAULT_SUBSCRIBER_OUTPUT_NAME_STREAM);
78         when(jobDataMap.get(eq(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME))).thenReturn(workerContext);
79         when(jobDataMap.get(eq(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME))).thenReturn(subscriber);
80         when(jobDataMap.get(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME)).thenReturn(metrics);
81         when(jobExecutionContext.getMergedJobDataMap()).thenReturn(jobDataMap);
82
83         doNothing().when(workerContext).write(anyString(), anyString());
84
85         subscriberJob = new TCADMaaPMRSubscriberJob();
86     }
87
88     @Test
89     public void testExecuteWhenMessagesAreFound() throws Exception {
90         final DMaaPMRSubscriberResponse subscriberResponse = mock(DMaaPMRSubscriberResponse.class);
91         when(subscriberResponse.getResponseCode()).thenReturn(200);
92         when(subscriberResponse.getResponseMessage()).thenReturn("testMessage");
93         when(subscriberResponse.getFetchedMessages()).thenReturn(ImmutableList.of("testMessage1", "testMessage1"));
94         when(subscriber.fetchMessages()).thenReturn(subscriberResponse);
95         subscriberJob.execute(jobExecutionContext);
96         verify(metrics, Mockito.times(1)).count(eq(CDAPMetricsConstants
97                 .DMAAP_MR_SUBSCRIBER_TOTAL_MESSAGES_PROCESSED_METRIC), eq(2));
98     }
99
100     @Test
101     public void testExecuteWhenNoMessagesFound() throws Exception {
102         final DMaaPMRSubscriberResponse subscriberResponse = mock(DMaaPMRSubscriberResponse.class);
103         when(subscriberResponse.getResponseCode()).thenReturn(200);
104         when(subscriberResponse.getResponseMessage()).thenReturn("no messages");
105         when(subscriberResponse.getFetchedMessages()).thenReturn(Collections.<String>emptyList());
106         when(subscriber.fetchMessages()).thenReturn(subscriberResponse);
107         subscriberJob.execute(jobExecutionContext);
108         verify(metrics, Mockito.times(1)).count(eq(CDAPMetricsConstants
109                 .DMAAP_MR_SUBSCRIBER_RESPONSES_WITH_NO_MESSAGES_METRIC), eq(1));
110     }
111
112
113     @Test
114     public void testExecuteWhenSubscriberReturnNonSuccessfulReturnCode() throws Exception {
115         final DMaaPMRSubscriberResponse subscriberResponse = mock(DMaaPMRSubscriberResponse.class);
116         when(subscriberResponse.getResponseCode()).thenReturn(500);
117         when(subscriber.fetchMessages()).thenReturn(subscriberResponse);
118         subscriberJob.execute(jobExecutionContext);
119         verify(metrics, Mockito.times(1)).count(eq(CDAPMetricsConstants
120                 .DMAAP_MR_SUBSCRIBER_UNSUCCESSFUL_RESPONSES_METRIC), eq(1));
121     }
122
123     @Test(expected = DCAEAnalyticsRuntimeException.class)
124     public void testExecuteWhenWritingToCDAPStreamThrowsException() throws Exception {
125         final DMaaPMRSubscriberResponse subscriberResponse = mock(DMaaPMRSubscriberResponse.class);
126         when(subscriberResponse.getResponseCode()).thenReturn(200);
127         when(subscriberResponse.getFetchedMessages()).thenReturn(Arrays.asList("TestMessage"));
128         when(subscriber.fetchMessages()).thenReturn(subscriberResponse);
129         doThrow(new IOException()).when(workerContext).write(anyString(), anyString());
130         subscriberJob.execute(jobExecutionContext);
131     }
132
133
134
135 }