Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-dmaap / src / test / java / org / openecomp / dcae / apod / analytics / dmaap / service / publisher / DMaaPMRPublisherImplTest.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
22
23 import org.apache.commons.lang3.tuple.ImmutablePair;
24 import org.apache.http.client.ResponseHandler;
25 import org.apache.http.client.methods.HttpPost;
26 import org.apache.http.client.methods.HttpUriRequest;
27 import org.apache.http.impl.client.CloseableHttpClient;
28 import org.junit.After;
29 import org.junit.Before;
30 import org.junit.Rule;
31 import org.junit.Test;
32 import org.junit.rules.ExpectedException;
33 import org.junit.runner.RunWith;
34 import org.mockito.Mock;
35 import org.mockito.Mockito;
36 import org.mockito.runners.MockitoJUnitRunner;
37 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
38 import org.openecomp.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest;
39 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
40 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
41
42 import java.io.IOException;
43 import java.util.ArrayList;
44
45 import static org.hamcrest.CoreMatchers.isA;
46 import static org.hamcrest.core.Is.is;
47 import static org.junit.Assert.assertThat;
48 import static org.mockito.BDDMockito.given;
49 import static org.mockito.Mockito.times;
50 import static org.mockito.Mockito.verify;
51
52 /**
53  * @author Rajiv Singla . Creation Date: 10/21/2016.
54  */
55 @RunWith(MockitoJUnitRunner.class)
56 public class DMaaPMRPublisherImplTest extends BaseAnalyticsDMaaPUnitTest {
57
58     @Mock
59     DMaaPMRPublisherQueueFactory dmaapMRPublisherQueueFactory;
60     @Mock
61     CloseableHttpClient closeableHttpClient;
62     @Mock
63     DMaaPMRPublisherQueue dmaapMRPublisherQueue;
64
65     @Before
66     public void setUp() throws Exception {
67         given(dmaapMRPublisherQueueFactory.create(Mockito.anyInt(), Mockito.anyInt()))
68                 .willReturn(dmaapMRPublisherQueue);
69     }
70
71     @After
72     public void tearDown() throws Exception {
73     }
74
75     @Test
76     public void testPublishSmallMessageList() throws Exception {
77         given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(10);
78         given(dmaapMRPublisherQueue.addBatchMessages(Mockito.<String>anyList())).willReturn(2);
79
80         DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
81                 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
82
83         DMaaPMRPublisherResponse dmaapMRPublisherResponse = dmaapMRPublisherImpl.publish(getTwoSampleMessages());
84
85         assertThat(dmaapMRPublisherResponse.getResponseCode(), is(202));
86         assertThat(dmaapMRPublisherResponse.getPendingMessagesCount(), is(2));
87         assertThat(dmaapMRPublisherResponse.getResponseMessage(),
88                 is("Accepted - Messages queued for batch publishing to MR Topic"));
89     }
90
91     @Test
92     public void testPublishBigMessageList() throws Exception {
93
94         given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(0);
95         given(dmaapMRPublisherQueue.getMessageForPublishing()).willReturn(getTwoSampleMessages());
96         Mockito.when(
97                 closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
98                 .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
99
100         DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
101                 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
102
103         DMaaPMRPublisherResponse dmaapMRPublisherResponse = dmaapMRPublisherImpl.publish(getTwoSampleMessages());
104
105         assertThat(dmaapMRPublisherResponse.getResponseCode(), is(200));
106         assertThat(dmaapMRPublisherResponse.getPendingMessagesCount(), is(200));
107         assertThat(dmaapMRPublisherResponse.getResponseMessage(), is("Message successfully posted"));
108     }
109
110     @Test
111     public void testForcePublishSuccessful() throws Exception {
112         DMaaPMRPublisherConfig dmaapMRPublisherConfig = new
113                 DMaaPMRPublisherConfig.Builder(HOST_NAME, TOPIC_NAME)
114                 .setPortNumber(PORT_NUMBER)
115                 .setProtocol(HTTP_PROTOCOL)
116                 .setContentType(CONTENT_TYPE)
117                 .setMaxRecoveryQueueSize(PUBLISHER_MAX_RECOVERY_QUEUE_SIZE)
118                 .setMaxBatchSize(PUBLISHER_MAX_BATCH_QUEUE_SIZE).build();
119
120         HttpPost httpPost = Mockito.mock(HttpPost.class);
121         Mockito.when(closeableHttpClient.execute(
122                 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
123                 .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
124
125         DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
126                 dmaapMRPublisherConfig, dmaapMRPublisherQueueFactory, closeableHttpClient);
127         DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages());
128         assertThat(response.getResponseCode(), is(200));
129     }
130
131     @Test
132     public void testForcePublishFailure() throws Exception {
133         Mockito.when(closeableHttpClient.execute(
134                 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
135                 .thenReturn(new ImmutablePair<>(503, "Message successfully posted"));
136
137         DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
138                 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
139         DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages());
140         assertThat(response.getResponseCode(), is(503));
141     }
142
143     @Rule
144     public ExpectedException httpIOException = ExpectedException.none();
145
146     @Test
147     public void testForcePublishHttpFailure() throws Exception {
148
149         httpIOException.expect(DCAEAnalyticsRuntimeException.class);
150         httpIOException.expectCause(isA(IOException.class));
151
152         given(closeableHttpClient.execute(
153                 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))).willThrow(IOException.class);
154
155         DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
156                 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
157         DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages());
158     }
159
160     @Test
161     public void testFlushSuccessful() throws Exception {
162         Mockito.when(closeableHttpClient.execute(
163                 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
164                 .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
165
166         Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages());
167
168         DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
169                 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
170         DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.flush();
171         assertThat(response.getResponseCode(), is(200));
172     }
173
174     @Test
175     public void testFlushEmptyList() throws Exception {
176         Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList());
177
178         DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
179                 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
180         DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.flush();
181         assertThat(response.getResponseCode(), is(204));
182     }
183
184     @Test
185     public void testClose() throws Exception {
186         Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList());
187         Mockito.when(closeableHttpClient.execute(
188                 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
189                 .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
190         Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages());
191
192         DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
193                 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
194         dmaapMRPublisherImpl.close();
195         verify(closeableHttpClient).execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class));
196     }
197
198     @Test
199     public void testCloseUnsuccessful() throws Exception {
200         Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList());
201         Mockito.when(closeableHttpClient.execute(
202                 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
203                 .thenReturn(new ImmutablePair<>(400, "Message successfully posted"));
204         Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages());
205
206         DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
207                 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
208         dmaapMRPublisherImpl.close();
209         verify(closeableHttpClient, times(6)).execute(Mockito.any(HttpUriRequest.class),
210                 Mockito.any(ResponseHandler.class));
211     }
212 }