2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
23 import co.cask.cdap.api.metrics.Metrics;
24 import co.cask.cdap.api.worker.WorkerContext;
25 import com.google.common.collect.ImmutableList;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.mockito.Mockito;
29 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
30 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
31 import org.openecomp.dcae.apod.analytics.cdap.tca.BaseAnalyticsCDAPTCAUnitTest;
32 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
33 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
34 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
35 import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
36 import org.quartz.JobDataMap;
37 import org.quartz.JobExecutionContext;
39 import java.io.IOException;
40 import java.util.Arrays;
41 import java.util.Collections;
43 import static org.mockito.ArgumentMatchers.anyInt;
44 import static org.mockito.ArgumentMatchers.anyString;
45 import static org.mockito.ArgumentMatchers.eq;
46 import static org.mockito.Mockito.doNothing;
47 import static org.mockito.Mockito.doThrow;
48 import static org.mockito.Mockito.mock;
49 import static org.mockito.Mockito.verify;
50 import static org.mockito.Mockito.when;
53 * @author Rajiv Singla . Creation Date: 12/20/2016.
55 public class TCADMaaPMRSubscriberJobTest extends BaseAnalyticsCDAPTCAUnitTest {
57 private JobExecutionContext jobExecutionContext;
58 private TCADMaaPMRSubscriberJob subscriberJob;
59 private JobDataMap jobDataMap;
60 private WorkerContext workerContext;
61 private DMaaPMRSubscriber subscriber;
62 private Metrics metrics;
66 public void before() throws Exception {
68 jobExecutionContext = mock(JobExecutionContext.class);
69 workerContext = mock(WorkerContext.class);
71 metrics = mock(Metrics.class);
72 doNothing().when(metrics).count(anyString(), anyInt());
73 subscriber = mock(DMaaPMRSubscriber.class);
75 jobDataMap = mock(JobDataMap.class);
76 when(jobDataMap.getString(eq(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME))).thenReturn
77 (CDAPComponentsConstants.TCA_DEFAULT_SUBSCRIBER_OUTPUT_NAME_STREAM);
78 when(jobDataMap.get(eq(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME))).thenReturn(workerContext);
79 when(jobDataMap.get(eq(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME))).thenReturn(subscriber);
80 when(jobDataMap.get(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME)).thenReturn(metrics);
81 when(jobExecutionContext.getMergedJobDataMap()).thenReturn(jobDataMap);
83 doNothing().when(workerContext).write(anyString(), anyString());
85 subscriberJob = new TCADMaaPMRSubscriberJob();
89 public void testExecuteWhenMessagesAreFound() throws Exception {
90 final DMaaPMRSubscriberResponse subscriberResponse = mock(DMaaPMRSubscriberResponse.class);
91 when(subscriberResponse.getResponseCode()).thenReturn(200);
92 when(subscriberResponse.getResponseMessage()).thenReturn("testMessage");
93 when(subscriberResponse.getFetchedMessages()).thenReturn(ImmutableList.of("testMessage1", "testMessage1"));
94 when(subscriber.fetchMessages()).thenReturn(subscriberResponse);
95 subscriberJob.execute(jobExecutionContext);
96 verify(metrics, Mockito.times(1)).count(eq(CDAPMetricsConstants
97 .DMAAP_MR_SUBSCRIBER_TOTAL_MESSAGES_PROCESSED_METRIC), eq(2));
101 public void testExecuteWhenNoMessagesFound() throws Exception {
102 final DMaaPMRSubscriberResponse subscriberResponse = mock(DMaaPMRSubscriberResponse.class);
103 when(subscriberResponse.getResponseCode()).thenReturn(200);
104 when(subscriberResponse.getResponseMessage()).thenReturn("no messages");
105 when(subscriberResponse.getFetchedMessages()).thenReturn(Collections.<String>emptyList());
106 when(subscriber.fetchMessages()).thenReturn(subscriberResponse);
107 subscriberJob.execute(jobExecutionContext);
108 verify(metrics, Mockito.times(1)).count(eq(CDAPMetricsConstants
109 .DMAAP_MR_SUBSCRIBER_RESPONSES_WITH_NO_MESSAGES_METRIC), eq(1));
114 public void testExecuteWhenSubscriberReturnNonSuccessfulReturnCode() throws Exception {
115 final DMaaPMRSubscriberResponse subscriberResponse = mock(DMaaPMRSubscriberResponse.class);
116 when(subscriberResponse.getResponseCode()).thenReturn(500);
117 when(subscriber.fetchMessages()).thenReturn(subscriberResponse);
118 subscriberJob.execute(jobExecutionContext);
119 verify(metrics, Mockito.times(1)).count(eq(CDAPMetricsConstants
120 .DMAAP_MR_SUBSCRIBER_UNSUCCESSFUL_RESPONSES_METRIC), eq(1));
123 @Test(expected = DCAEAnalyticsRuntimeException.class)
124 public void testExecuteWhenWritingToCDAPStreamThrowsException() throws Exception {
125 final DMaaPMRSubscriberResponse subscriberResponse = mock(DMaaPMRSubscriberResponse.class);
126 when(subscriberResponse.getResponseCode()).thenReturn(200);
127 when(subscriberResponse.getFetchedMessages()).thenReturn(Arrays.asList("TestMessage"));
128 when(subscriber.fetchMessages()).thenReturn(subscriberResponse);
129 doThrow(new IOException()).when(workerContext).write(anyString(), anyString());
130 subscriberJob.execute(jobExecutionContext);