4abbe8970e9480be705415e93fea66ebc7b4e888
[dmaap/messagerouter/msgrtr.git] / src / test / java / org / onap / dmaap / dmf / mr / service / impl / EventsServiceImplTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP Policy Engine
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dmaap.dmf.mr.service.impl;
22
23 import static org.hamcrest.CoreMatchers.containsString;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Matchers.any;
28 import static org.mockito.Matchers.anyInt;
29 import static org.mockito.Matchers.anyLong;
30 import static org.mockito.Matchers.anyString;
31 import static org.mockito.Matchers.eq;
32 import static org.mockito.Mockito.doNothing;
33 import static org.mockito.Mockito.doReturn;
34 import static org.mockito.Mockito.doThrow;
35 import static org.mockito.Mockito.mock;
36 import static org.mockito.Mockito.never;
37 import static org.mockito.Mockito.verify;
38 import static org.mockito.Mockito.when;
39
40 import com.att.nsa.limits.Blacklist;
41 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
42 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
43 import java.io.ByteArrayInputStream;
44 import java.io.IOException;
45 import java.io.InputStream;
46 import java.util.ArrayList;
47 import java.util.Collections;
48 import java.util.ConcurrentModificationException;
49 import javax.servlet.http.HttpServletRequest;
50 import joptsimple.internal.Strings;
51 import org.apache.http.HttpStatus;
52 import org.apache.kafka.clients.producer.ProducerRecord;
53 import org.json.JSONObject;
54 import org.junit.Before;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.rules.ExpectedException;
58 import org.junit.runner.RunWith;
59 import org.mockito.ArgumentCaptor;
60 import org.mockito.InOrder;
61 import org.mockito.Mock;
62 import org.mockito.Mockito;
63 import org.mockito.MockitoAnnotations;
64 import org.mockito.Spy;
65 import org.mockito.runners.MockitoJUnitRunner;
66 import org.onap.dmaap.dmf.mr.CambriaApiException;
67 import org.onap.dmaap.dmf.mr.backends.Consumer;
68 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
69 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
70 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
71 import org.onap.dmaap.dmf.mr.backends.Publisher;
72 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
73 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
74 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
75 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
76 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
77 import org.onap.dmaap.dmf.mr.metabroker.Topic;
78 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
79 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator;
80 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
81 import org.springframework.mock.web.MockHttpServletRequest;
82 import org.springframework.mock.web.MockHttpServletResponse;
83
84 @RunWith(MockitoJUnitRunner.class)
85 public class EventsServiceImplTest {
86
87     private InputStream iStream = null;
88     private DMaaPContext dMaapContext = new DMaaPContext();
89     private DMaaPErrorMessages pErrorMessages = new DMaaPErrorMessages();
90     @Mock
91     private ConfigurationReader configurationReader;
92     @Mock
93     private Blacklist blacklist;
94     @Mock
95     private DMaaPAuthenticator<NsaSimpleApiKey> dmaaPAuthenticator;
96     @Mock
97     private NsaSimpleApiKey nsaSimpleApiKey;
98     @Mock
99     private DMaaPKafkaMetaBroker dmaapKafkaMetaBroker;
100     @Mock
101     private Topic createdTopic;
102     @Mock
103     private ConsumerFactory factory;
104     @Mock
105     private Consumer consumer;
106     @Mock
107     private Publisher publisher;
108     @Mock
109     private DMaaPCambriaLimiter limiter;
110     @Mock
111     private MetricsSet metrics;
112     @Spy
113     private EventsServiceImpl eventsService;
114
115
116     @Rule
117     public ExpectedException thrown = ExpectedException.none();
118
119     private MockHttpServletRequest request;
120
121     @Before
122     public void setUp() throws Exception {
123         MockitoAnnotations.initMocks(this);
124         String source = "source of my InputStream";
125         iStream = new ByteArrayInputStream(source.getBytes("UTF-8"));
126
127         request = new MockHttpServletRequest();
128         MockHttpServletResponse response = new MockHttpServletResponse();
129         dMaapContext.setRequest(request);
130         dMaapContext.setResponse(response);
131         when(blacklist.contains(anyString())).thenReturn(false);
132         when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
133         when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
134         dMaapContext.setConfigReader(configurationReader);
135         eventsService.setErrorMessages(pErrorMessages);
136         doReturn("100").when(eventsService).getPropertyFromAJSCmap("timeout");
137     }
138
139     @Test
140     public void getEvents_shouldFailOnAafAuthorization() throws Exception {
141         String topicPrefix = "org.onap.aaf.enforced";
142         String topicName = topicPrefix + ".topicName";
143         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
144         when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
145         when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
146         when(eventsService.isCadiEnabled()).thenReturn(true);
147
148         thrown.expect(DMaaPAccessDeniedException.class);
149         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_UNAUTHORIZED)));
150
151         eventsService.getEvents(dMaapContext, topicName, "CG1", "23");
152     }
153
154     @Test
155     public void getEvents_shouldFail_whenRemoteAddressIsBlacklisted() throws Exception {
156         String remoteIp = "10.154.17.115";
157         request.setRemoteAddr(remoteIp);
158         when(blacklist.contains(remoteIp)).thenReturn(true);
159         when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
160
161         thrown.expect(CambriaApiException.class);
162         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_FORBIDDEN)));
163
164         eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
165     }
166
167     @Test
168     public void getEvents_shouldFail_whenRequestedTopicNotExists() throws Exception {
169         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
170         when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(null);
171
172         thrown.expect(CambriaApiException.class);
173         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
174
175         eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
176     }
177
178     @Test
179     public void getEvents_shouldFail_whenConsumerLockCannotBeAcquired() throws Exception {
180         //given
181         String topicName = "testTopic345";
182         String consumerGroup = "CG5";
183         String clientId = "13";
184         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
185         when(configurationReader.getfRateLimiter()).thenReturn(limiter);
186         when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
187         when(configurationReader.getfConsumerFactory()).thenReturn(factory);
188         when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
189         doThrow(new UnavailableException("Could not acquire consumer lock")).when(factory)
190             .getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
191
192         thrown.expect(CambriaApiException.class);
193         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)));
194
195         //when
196         eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
197
198         //then
199         verify(factory).getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
200
201     }
202
203     @Test
204     public void getEvents_shouldFail_whenBrokerServicesAreUnavailable() throws Exception {
205         String topicName = "testTopic";
206         String consumerGroup = "CG1";
207         String clientId = "23";
208         when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(null);
209         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
210         when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
211         when(configurationReader.getfConsumerFactory()).thenReturn(factory);
212
213         givenUserAuthorizedWithAAF(request, topicName, "sub");
214
215         thrown.expect(CambriaApiException.class);
216         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)));
217
218         //when
219         eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
220
221         //then
222         verify(factory).destroyConsumer(topicName, consumerGroup, clientId);
223     }
224
225     private void givenUserAuthorizedWithAAF(MockHttpServletRequest request, String topicName, String operation) {
226         String permission = "org.onap.dmaap.mr.topic|:topic." + topicName + "|" + operation;
227         request.addUserRole(permission);
228     }
229
230     @Test
231     public void getEvents_shouldHandleConcurrentModificationError() throws Exception {
232         String testTopic = "testTopic";
233         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
234         when(dmaapKafkaMetaBroker.getTopic(testTopic)).thenReturn(createdTopic);
235         when(configurationReader.getfConsumerFactory()).thenReturn(factory);
236         when(configurationReader.getfRateLimiter()).thenThrow(new ConcurrentModificationException("Error occurred"));
237         givenUserAuthorizedWithAAF(request, testTopic, "sub");
238
239         thrown.expect(CambriaApiException.class);
240         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_CONFLICT)));
241
242         eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
243     }
244
245     @Test
246     public void getEvents_shouldNotAuthorizeClient_whenSubscribingToMetricsTopic() throws Exception {
247         //given
248         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
249         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
250         dMaapContext.setRequest(permittedRequest);
251         String metricsTopicName = "msgrtr.apinode.metrics.dmaap";
252         String consumerGroup = "CG5";
253         String clientId = "7";
254         givenConfiguredWithMocks(metricsTopicName);
255         when(factory.getConsumerFor(eq(metricsTopicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
256             .thenReturn(consumer);
257         doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
258
259         //when
260         eventsService.getEvents(dMaapContext, metricsTopicName, consumerGroup, clientId);
261
262         //then
263         verify(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
264         verify(dmaaPAuthenticator, never()).authenticate(dMaapContext);
265         verify(permittedRequest, never()).isUserInRole(anyString());
266     }
267
268     @Test
269     public void getEvents_shouldNotAuthorizeClient_whenTopicNoteEnforcedWithAaf_andTopicHasNoOwnerSet()
270         throws Exception {
271         //given
272         String topicName = "someSimpleTopicName";
273         String consumerGroup = "CG5";
274         String clientId = "7";
275         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
276         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
277         dMaapContext.setRequest(permittedRequest);
278         givenConfiguredWithMocks(topicName);
279         when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
280             .thenReturn(consumer);
281         doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
282         when(createdTopic.getOwner()).thenReturn(Strings.EMPTY);
283
284         //when
285         eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
286
287         //then
288         verify(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
289         verify(dmaaPAuthenticator, never()).authenticate(dMaapContext);
290         verify(permittedRequest, never()).isUserInRole(anyString());
291     }
292
293     @Test
294     public void getEvents_shouldFailDmaapAuthorization_whenTopicOwnerIsSet_andUserHasNoReadPermissionToTopic()
295         throws Exception {
296         //given
297         String topicName = "someSimpleTopicName";
298         String consumerGroup = "CG5";
299         String clientId = "7";
300         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
301         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
302         dMaapContext.setRequest(permittedRequest);
303         givenConfiguredWithMocks(topicName);
304         when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
305             .thenReturn(consumer);
306         doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
307         when(createdTopic.getOwner()).thenReturn("SimpleTopicOwner");
308         when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
309         doThrow(new AccessDeniedException("userName")).when(createdTopic).checkUserRead(nsaSimpleApiKey);
310
311         thrown.expect(AccessDeniedException.class);
312
313         //when
314         eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
315
316         //then
317         verify(createdTopic).checkUserRead(nsaSimpleApiKey);
318         verify(eventsService, never()).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
319         verify(permittedRequest, never()).isUserInRole(anyString());
320     }
321
322
323     @Test
324     public void getEvents_shouldSuccessfullyRegisterConsumerToEventsStream_withAafAuthorization() throws Exception {
325         //given
326         String topicName = "testTopic";
327         String consumerGroup = "CG2";
328         String clientId = "6";
329         String messageLimit = "10";
330         String timeout = "25";
331         String meta = "yes";
332         String pretty = "on";
333         String cacheEnabled = "false";
334
335         givenConfiguredWithMocks(topicName);
336         givenConfiguredWithProperties(messageLimit, timeout, meta, pretty, cacheEnabled);
337         when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
338             .thenReturn(consumer);
339         givenUserAuthorizedWithAAF(request, topicName, "sub");
340
341         //when
342         eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
343
344         //then
345         ArgumentCaptor<CambriaOutboundEventStream> osWriter = ArgumentCaptor.forClass(CambriaOutboundEventStream.class);
346         verifyInvocationOrderForSuccessCase(topicName, consumerGroup, clientId, osWriter);
347         assertEventStreamProperties(osWriter.getValue(), messageLimit, timeout);
348     }
349
350     private void assertEventStreamProperties(CambriaOutboundEventStream stream, String messageLimit, String timeout) {
351         assertEquals(Integer.valueOf(messageLimit).intValue(), stream.getfLimit());
352         assertEquals(Integer.valueOf(timeout).intValue(), stream.getfTimeoutMs());
353         assertTrue(stream.isfWithMeta());
354         assertTrue(stream.isfPretty());
355     }
356
357     private void givenConfiguredWithProperties(String messageLimit, String timeout, String meta, String pretty,
358         String cacheEnabled) {
359         when(eventsService.getPropertyFromAJSCmap("meta")).thenReturn(meta);
360         when(eventsService.getPropertyFromAJSCmap("pretty")).thenReturn(pretty);
361         when(eventsService.getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache)).thenReturn(cacheEnabled);
362         request.addParameter("timeout", timeout);
363         request.addParameter("limit", messageLimit);
364     }
365
366     private void givenConfiguredWithMocks(String topicName) throws Exception {
367         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
368         when(configurationReader.getfRateLimiter()).thenReturn(limiter);
369         when(configurationReader.getfMetrics()).thenReturn(metrics);
370         when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
371         when(configurationReader.getfConsumerFactory()).thenReturn(factory);
372         when(configurationReader.getfPublisher()).thenReturn(publisher);
373     }
374
375     private void verifyInvocationOrderForSuccessCase(String topicName, String consumerGroup, String clientId,
376         ArgumentCaptor<CambriaOutboundEventStream> osWriter) throws Exception {
377
378         InOrder inOrder = Mockito.inOrder(configurationReader, factory, metrics, limiter, consumer, eventsService);
379         inOrder.verify(configurationReader).getfMetrics();
380         inOrder.verify(configurationReader).getfRateLimiter();
381         inOrder.verify(limiter).onCall(eq(topicName), eq(consumerGroup), eq(clientId), anyString());
382         inOrder.verify(factory).getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
383         inOrder.verify(eventsService).respondOkWithStream(eq(dMaapContext), osWriter.capture());
384         inOrder.verify(consumer).commitOffsets();
385         inOrder.verify(metrics).consumeTick(anyInt());
386         inOrder.verify(limiter).onSend(eq(topicName), eq(consumerGroup), eq(clientId), anyLong());
387         inOrder.verify(consumer).close();
388         inOrder.verifyNoMoreInteractions();
389     }
390
391     @Test
392     public void pushEvents_shouldFail_whenRemoteAddressIsBlacklisted() throws Exception {
393         String remoteIp = "10.132.64.112";
394         request.setRemoteAddr(remoteIp);
395         when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
396         when(blacklist.contains(anyString())).thenReturn(true);
397
398         thrown.expect(CambriaApiException.class);
399         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_FORBIDDEN)));
400
401         eventsService.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00");
402     }
403
404
405     @Test
406     public void pushEvents_shouldFail_whenRequestedTopicDoesNotExist() throws Exception {
407         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
408         when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(null);
409
410         thrown.expect(CambriaApiException.class);
411         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
412
413         eventsService.pushEvents(dMaapContext, "testTopic", iStream, "5", "13:00:00");
414     }
415
416     @Test
417     public void pushEvents_shouldFailDmaapAuthorization_whenTopicOwnerIsSet_andUserHasNoWritePermissionToTopic()
418         throws Exception {
419         //given
420         String topicName = "someSimpleTopicName";
421
422         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
423         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
424         dMaapContext.setRequest(permittedRequest);
425         givenConfiguredWithMocks(topicName);
426         when(createdTopic.getOwner()).thenReturn("SimpleTopicOwner");
427         when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
428         doThrow(new AccessDeniedException("userName")).when(createdTopic).checkUserWrite(nsaSimpleApiKey);
429
430         thrown.expect(AccessDeniedException.class);
431
432         //when
433         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
434
435         //then
436         verify(createdTopic).checkUserWrite(nsaSimpleApiKey);
437         verify(eventsService, never()).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
438         verify(permittedRequest, never()).isUserInRole(anyString());
439     }
440
441     @Test
442     public void pushEvents_shouldFailOnAafAuthorization_whenCadiIsEnabled_topicNameEnforced_andUserHasNoPermission()
443         throws Exception {
444         //given
445         String topicPrefix = "org.onap.aaf.enforced";
446         String topicName = topicPrefix + ".topicName";
447         String permission = "org.onap.dmaap.mr.topic|:topic." + topicName + "|pub";
448         HttpServletRequest deniedRequest = mock(HttpServletRequest.class);
449         when(deniedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
450         when(deniedRequest.isUserInRole(permission)).thenReturn(false);
451         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
452         when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
453         when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
454         when(eventsService.isCadiEnabled()).thenReturn(true);
455         dMaapContext.setRequest(deniedRequest);
456
457         thrown.expect(DMaaPAccessDeniedException.class);
458         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_UNAUTHORIZED)));
459
460         //when
461         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
462
463         //then
464         verify(deniedRequest).isUserInRole(permission);
465     }
466
467
468     @Test
469     public void pushEvents_shouldPublishMessagesWithoutTransaction() throws Exception {
470         //given
471         String topicName = "topicWithoutTransaction";
472         givenConfiguredWithMocks(topicName);
473         doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
474
475         //when
476         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
477
478         //then
479         verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
480         ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
481         verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
482         assertEquals(1, captor.getValue().getLong("count"));
483     }
484
485     @Test
486     public void pushEvents_shouldHandlePublisherError_whenPushWithoutTransaction() throws Exception {
487         //given
488         String topicName = "topicWithoutTransaction";
489         givenConfiguredWithMocks(topicName);
490         doThrow(new IOException()).when(publisher)
491             .sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
492
493         thrown.expect(CambriaApiException.class);
494         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
495
496         //when
497         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
498
499         //then
500         verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
501         verify(eventsService, never()).respondOk(any(DMaaPContext.class), any(JSONObject.class));
502     }
503
504
505     @Test
506     public void pushEvents_shouldPublishMessagesWithTransaction() throws Exception {
507         //given
508         String topicPrefix = "org.onap.dmaap.mr";
509         String topicName = topicPrefix + ".topicWithTransaction";
510         givenConfiguredWithMocks(topicName);
511         when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
512         when(eventsService.isCadiEnabled()).thenReturn(true);
513         doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
514
515         request.addUserRole("org.onap.dmaap.mr.topic|:topic." + topicName + "|pub");
516
517         //when
518         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
519
520         //then
521         verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
522         ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
523         verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
524         assertEquals(1, captor.getValue().getLong("count"));
525         assertFalse(captor.getValue().getString("transactionId").isEmpty());
526     }
527
528     @Test
529     public void pushEvents_shouldHandlePublisherError_whenPushWithTransaction() throws Exception {
530         //given
531         String topicPrefix = "org.onap.dmaap.mr";
532         String topicName = topicPrefix + ".topicWithTransaction";
533         givenConfiguredWithMocks(topicName);
534         when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
535         when(eventsService.isCadiEnabled()).thenReturn(true);
536         doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
537         request.addUserRole("org.onap.dmaap.mr.topic|:topic." + topicName + "|pub");
538         doThrow(new IOException()).when(publisher)
539             .sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
540
541         thrown.expect(CambriaApiException.class);
542         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
543
544         //when
545         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
546
547         //then
548         verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
549         verify(eventsService, never()).respondOk(any(DMaaPContext.class), any(JSONObject.class));
550     }
551
552     @Test
553     public void pushEvents_shouldNotPerformAnyAuthorization_whenPublishToMetricTopic() throws Exception {
554         //given
555         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
556         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
557         dMaapContext.setRequest(permittedRequest);
558         String metricsTopicName = "msgrtr.apinode.metrics.dmaap";
559         givenConfiguredWithMocks(metricsTopicName);
560         doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
561
562         //when
563         eventsService.pushEvents(dMaapContext, metricsTopicName, iStream, "5", "13:00:00");
564
565         //then
566         ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
567         verify(publisher)
568             .sendBatchMessageNew(eq(metricsTopicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
569         verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
570         verify(permittedRequest, never()).isUserInRole(anyString());
571         verify(createdTopic, never()).checkUserWrite(any(NsaSimpleApiKey.class));
572         assertEquals(1, captor.getValue().getLong("count"));
573     }
574
575     @Test
576     public void pushEvents_shouldNotPerformAnyAuthorization_whenTopicHasNoOwner() throws Exception {
577         //given
578         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
579         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
580         dMaapContext.setRequest(permittedRequest);
581         String topicName = "notEnforcedAafTopic";
582         givenConfiguredWithMocks(topicName);
583         doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
584         when(createdTopic.getOwner()).thenReturn(null);
585
586         //when
587         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
588
589         //then
590         ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
591         verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
592         verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
593         verify(permittedRequest, never()).isUserInRole(anyString());
594         verify(createdTopic, never()).checkUserWrite(any(NsaSimpleApiKey.class));
595         assertEquals(1, captor.getValue().getLong("count"));
596     }
597
598 }