2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.dmaap.service.publisher;
23 import org.apache.commons.lang3.tuple.ImmutablePair;
24 import org.apache.http.client.ResponseHandler;
25 import org.apache.http.client.methods.HttpPost;
26 import org.apache.http.client.methods.HttpUriRequest;
27 import org.apache.http.impl.client.CloseableHttpClient;
28 import org.junit.After;
29 import org.junit.Before;
30 import org.junit.Rule;
31 import org.junit.Test;
32 import org.junit.rules.ExpectedException;
33 import org.junit.runner.RunWith;
34 import org.mockito.Mock;
35 import org.mockito.Mockito;
36 import org.mockito.junit.MockitoJUnitRunner;
37 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
38 import org.openecomp.dcae.apod.analytics.dmaap.BaseAnalyticsDMaaPUnitTest;
39 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
40 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
42 import java.io.IOException;
43 import java.util.ArrayList;
45 import static org.hamcrest.CoreMatchers.isA;
46 import static org.hamcrest.core.Is.is;
47 import static org.junit.Assert.assertThat;
48 import static org.mockito.BDDMockito.given;
49 import static org.mockito.Mockito.times;
50 import static org.mockito.Mockito.verify;
53 * @author Rajiv Singla . Creation Date: 10/21/2016.
55 @RunWith(MockitoJUnitRunner.class)
56 public class DMaaPMRPublisherImplTest extends BaseAnalyticsDMaaPUnitTest {
59 DMaaPMRPublisherQueueFactory dmaapMRPublisherQueueFactory;
61 CloseableHttpClient closeableHttpClient;
63 DMaaPMRPublisherQueue dmaapMRPublisherQueue;
66 public void setUp() throws Exception {
67 given(dmaapMRPublisherQueueFactory.create(Mockito.anyInt(), Mockito.anyInt()))
68 .willReturn(dmaapMRPublisherQueue);
72 public void tearDown() throws Exception {
76 public void testPublishSmallMessageList() throws Exception {
77 given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(10);
78 given(dmaapMRPublisherQueue.addBatchMessages(Mockito.<String>anyList())).willReturn(2);
80 DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
81 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
83 DMaaPMRPublisherResponse dmaapMRPublisherResponse = dmaapMRPublisherImpl.publish(getTwoSampleMessages());
85 assertThat(dmaapMRPublisherResponse.getResponseCode(), is(202));
86 assertThat(dmaapMRPublisherResponse.getPendingMessagesCount(), is(2));
87 assertThat(dmaapMRPublisherResponse.getResponseMessage(),
88 is("Accepted - Messages queued for batch publishing to MR Topic"));
92 public void testPublishBigMessageList() throws Exception {
94 given(dmaapMRPublisherQueue.getBatchQueueRemainingSize()).willReturn(0);
95 given(dmaapMRPublisherQueue.getMessageForPublishing()).willReturn(getTwoSampleMessages());
97 closeableHttpClient.execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
98 .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
100 DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
101 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
103 DMaaPMRPublisherResponse dmaapMRPublisherResponse = dmaapMRPublisherImpl.publish(getTwoSampleMessages());
105 assertThat(dmaapMRPublisherResponse.getResponseCode(), is(200));
106 assertThat(dmaapMRPublisherResponse.getPendingMessagesCount(), is(200));
107 assertThat(dmaapMRPublisherResponse.getResponseMessage(), is("Message successfully posted"));
111 public void testForcePublishSuccessful() throws Exception {
112 DMaaPMRPublisherConfig dmaapMRPublisherConfig = new
113 DMaaPMRPublisherConfig.Builder(HOST_NAME, TOPIC_NAME)
114 .setPortNumber(PORT_NUMBER)
115 .setProtocol(HTTP_PROTOCOL)
116 .setContentType(CONTENT_TYPE)
117 .setMaxRecoveryQueueSize(PUBLISHER_MAX_RECOVERY_QUEUE_SIZE)
118 .setMaxBatchSize(PUBLISHER_MAX_BATCH_QUEUE_SIZE).build();
120 HttpPost httpPost = Mockito.mock(HttpPost.class);
121 Mockito.when(closeableHttpClient.execute(
122 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
123 .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
125 DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
126 dmaapMRPublisherConfig, dmaapMRPublisherQueueFactory, closeableHttpClient);
127 DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages());
128 assertThat(response.getResponseCode(), is(200));
132 public void testForcePublishFailure() throws Exception {
133 Mockito.when(closeableHttpClient.execute(
134 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
135 .thenReturn(new ImmutablePair<>(503, "Message successfully posted"));
137 DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
138 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
139 DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages());
140 assertThat(response.getResponseCode(), is(503));
144 public ExpectedException httpIOException = ExpectedException.none();
147 public void testForcePublishHttpFailure() throws Exception {
149 httpIOException.expect(DCAEAnalyticsRuntimeException.class);
150 httpIOException.expectCause(isA(IOException.class));
152 given(closeableHttpClient.execute(
153 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class))).willThrow(IOException.class);
155 DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
156 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
157 DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.forcePublish(getTwoSampleMessages());
161 public void testFlushSuccessful() throws Exception {
162 Mockito.when(closeableHttpClient.execute(
163 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
164 .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
166 Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages());
168 DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
169 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
170 DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.flush();
171 assertThat(response.getResponseCode(), is(200));
175 public void testFlushEmptyList() throws Exception {
176 Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList());
178 DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
179 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
180 DMaaPMRPublisherResponse response = dmaapMRPublisherImpl.flush();
181 assertThat(response.getResponseCode(), is(204));
185 public void testClose() throws Exception {
186 Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList());
187 Mockito.when(closeableHttpClient.execute(
188 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
189 .thenReturn(new ImmutablePair<>(200, "Message successfully posted"));
190 Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages());
192 DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
193 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
194 dmaapMRPublisherImpl.close();
195 verify(closeableHttpClient).execute(Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class));
199 public void testCloseUnsuccessful() throws Exception {
200 Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(new ArrayList());
201 Mockito.when(closeableHttpClient.execute(
202 Mockito.any(HttpUriRequest.class), Mockito.any(ResponseHandler.class)))
203 .thenReturn(new ImmutablePair<>(400, "Message successfully posted"));
204 Mockito.when(dmaapMRPublisherQueue.getMessageForPublishing()).thenReturn(getTwoSampleMessages());
206 DMaaPMRPublisherImpl dmaapMRPublisherImpl = new DMaaPMRPublisherImpl(
207 getPublisherConfig(), dmaapMRPublisherQueueFactory, closeableHttpClient);
208 dmaapMRPublisherImpl.close();
209 verify(closeableHttpClient, times(6)).execute(Mockito.any(HttpUriRequest.class),
210 Mockito.any(ResponseHandler.class));