MR Java 11 Uplift
[dmaap/messagerouter/msgrtr.git] / src / test / java / org / onap / dmaap / dmf / mr / service / impl / EventsServiceImplTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP Policy Engine
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dmaap.dmf.mr.service.impl;
22
23 import static org.hamcrest.CoreMatchers.containsString;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.ArgumentMatchers.any;
28 import static org.mockito.ArgumentMatchers.anyInt;
29 import static org.mockito.ArgumentMatchers.anyLong;
30 import static org.mockito.ArgumentMatchers.anyString;
31 import static org.mockito.ArgumentMatchers.eq;
32 import static org.mockito.Mockito.doNothing;
33 import static org.mockito.Mockito.doReturn;
34 import static org.mockito.Mockito.doThrow;
35 import static org.mockito.Mockito.mock;
36 import static org.mockito.Mockito.never;
37 import static org.mockito.Mockito.verify;
38 import static org.mockito.Mockito.when;
39
40 import com.att.nsa.limits.Blacklist;
41 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
42 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
43 import java.io.ByteArrayInputStream;
44 import java.io.IOException;
45 import java.io.InputStream;
46 import java.util.ArrayList;
47 import java.util.Collections;
48 import java.util.ConcurrentModificationException;
49 import javax.servlet.http.HttpServletRequest;
50 import joptsimple.internal.Strings;
51 import org.apache.http.HttpStatus;
52 import org.apache.kafka.clients.producer.ProducerRecord;
53 import org.json.JSONObject;
54 import org.junit.Before;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.rules.ExpectedException;
58 import org.junit.runner.RunWith;
59 import org.mockito.ArgumentCaptor;
60 import org.mockito.InOrder;
61 import org.mockito.Mock;
62 import org.mockito.Mockito;
63 import org.mockito.MockitoAnnotations;
64 import org.mockito.Spy;
65 import org.mockito.junit.MockitoJUnitRunner;
66 import org.onap.dmaap.dmf.mr.CambriaApiException;
67 import org.onap.dmaap.dmf.mr.backends.Consumer;
68 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
69 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
70 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
71 import org.onap.dmaap.dmf.mr.backends.Publisher;
72 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
73 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
74 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
75 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
76 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
77 import org.onap.dmaap.dmf.mr.metabroker.Topic;
78 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
79 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator;
80 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
81 import org.springframework.mock.web.MockHttpServletRequest;
82 import org.springframework.mock.web.MockHttpServletResponse;
83
84 @RunWith(MockitoJUnitRunner.Silent.class)
85 public class EventsServiceImplTest {
86
87     private InputStream iStream = null;
88     private DMaaPContext dMaapContext = new DMaaPContext();
89     private DMaaPErrorMessages pErrorMessages = new DMaaPErrorMessages();
90     @Mock
91     private ConfigurationReader configurationReader;
92     @Mock
93     private Blacklist blacklist;
94     @Mock
95     private DMaaPAuthenticator<NsaSimpleApiKey> dmaaPAuthenticator;
96     @Mock
97     private NsaSimpleApiKey nsaSimpleApiKey;
98     @Mock
99     private DMaaPKafkaMetaBroker dmaapKafkaMetaBroker;
100     @Mock
101     private Topic createdTopic;
102     @Mock
103     private ConsumerFactory factory;
104     @Mock
105     private Consumer consumer;
106     @Mock
107     private Publisher publisher;
108     @Mock
109     private DMaaPCambriaLimiter limiter;
110     @Mock
111     private MetricsSet metrics;
112     @Spy
113     private EventsServiceImpl eventsService;
114
115
116     @Rule
117     public ExpectedException thrown = ExpectedException.none();
118
119     private MockHttpServletRequest request;
120
121
122     @Before
123     public void setUp() throws Exception {
124         MockitoAnnotations.initMocks(this);
125         String source = "source of my InputStream";
126         iStream = new ByteArrayInputStream(source.getBytes("UTF-8"));
127
128         request = new MockHttpServletRequest();
129         MockHttpServletResponse response = new MockHttpServletResponse();
130         dMaapContext.setRequest(request);
131         dMaapContext.setResponse(response);
132         when(blacklist.contains(anyString())).thenReturn(false);
133         when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
134         when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
135         dMaapContext.setConfigReader(configurationReader);
136         eventsService.setErrorMessages(pErrorMessages);
137         doReturn("100").when(eventsService).getPropertyFromAJSCmap("timeout");
138     }
139
140     @Test
141     public void getEvents_shouldFailOnAafAuthorization() throws Exception {
142         String topicPrefix = "org.onap.aaf.enforced";
143         String topicName = topicPrefix + ".topicName";
144         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
145         when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
146         when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
147         when(eventsService.isCadiEnabled()).thenReturn(true);
148
149         thrown.expect(DMaaPAccessDeniedException.class);
150         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_UNAUTHORIZED)));
151
152         eventsService.getEvents(dMaapContext, topicName, "CG1", "23");
153     }
154
155     @Test
156     public void getEvents_shouldFail_whenRemoteAddressIsBlacklisted() throws Exception {
157         String remoteIp = "10.154.17.115";
158         request.setRemoteAddr(remoteIp);
159         when(blacklist.contains(remoteIp)).thenReturn(true);
160         when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
161
162         thrown.expect(CambriaApiException.class);
163         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_FORBIDDEN)));
164
165         eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
166     }
167
168     @Test
169     public void getEvents_shouldFail_whenRequestedTopicNotExists() throws Exception {
170         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
171         when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(null);
172
173         thrown.expect(CambriaApiException.class);
174         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
175
176         eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
177     }
178
179     @Test
180     public void getEvents_shouldFail_whenConsumerLockCannotBeAcquired() throws Exception {
181         //given
182         String topicName = "testTopic345";
183         String consumerGroup = "CG5";
184         String clientId = "13";
185         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
186         when(configurationReader.getfRateLimiter()).thenReturn(limiter);
187         when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
188         when(configurationReader.getfConsumerFactory()).thenReturn(factory);
189         when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
190         doThrow(new UnavailableException("Could not acquire consumer lock")).when(factory)
191                 .getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
192
193         thrown.expect(CambriaApiException.class);
194         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)));
195
196         //when
197         eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
198
199         //then
200         verify(factory).getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
201
202     }
203
204     @Test
205     public void getEvents_shouldFail_whenBrokerServicesAreUnavailable() throws Exception {
206         String topicName = "testTopic";
207         String consumerGroup = "CG1";
208         String clientId = "23";
209         when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(null);
210         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
211         when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
212         when(configurationReader.getfConsumerFactory()).thenReturn(factory);
213
214         givenUserAuthorizedWithAAF(request, topicName, "sub");
215
216         thrown.expect(CambriaApiException.class);
217         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)));
218
219         //when
220         eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
221
222         //then
223         verify(factory).destroyConsumer(topicName, consumerGroup, clientId);
224     }
225
226     private void givenUserAuthorizedWithAAF(MockHttpServletRequest request, String topicName, String operation) {
227         String permission = "org.onap.dmaap.mr.topic|:topic." + topicName + "|" + operation;
228         request.addUserRole(permission);
229     }
230
231     @Test
232     public void getEvents_shouldHandleConcurrentModificationError() throws Exception {
233         String testTopic = "testTopic";
234         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
235         when(dmaapKafkaMetaBroker.getTopic(testTopic)).thenReturn(createdTopic);
236         when(configurationReader.getfConsumerFactory()).thenReturn(factory);
237         when(configurationReader.getfRateLimiter()).thenThrow(new ConcurrentModificationException("Error occurred"));
238         givenUserAuthorizedWithAAF(request, testTopic, "sub");
239
240         thrown.expect(CambriaApiException.class);
241         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_CONFLICT)));
242
243         eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
244     }
245
246     @Test
247     public void getEvents_shouldNotAuthorizeClient_whenSubscribingToMetricsTopic() throws Exception {
248         //given
249         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
250         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
251         dMaapContext.setRequest(permittedRequest);
252         String metricsTopicName = "msgrtr.apinode.metrics.dmaap";
253         String consumerGroup = "CG5";
254         String clientId = "7";
255         givenConfiguredWithMocks(metricsTopicName);
256         when(factory.getConsumerFor(eq(metricsTopicName), eq(consumerGroup), eq(clientId), anyInt(), any()))
257                 .thenReturn(consumer);
258         doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
259
260         //when
261         eventsService.getEvents(dMaapContext, metricsTopicName, consumerGroup, clientId);
262
263         //then
264         verify(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
265         verify(dmaaPAuthenticator, never()).authenticate(dMaapContext);
266         verify(permittedRequest, never()).isUserInRole(anyString());
267     }
268
269     @Test
270     public void getEvents_shouldNotAuthorizeClient_whenTopicNoteEnforcedWithAaf_andTopicHasNoOwnerSet()
271             throws Exception {
272         //given
273         String topicName = "someSimpleTopicName";
274         String consumerGroup = "CG5";
275         String clientId = "7";
276         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
277         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
278         dMaapContext.setRequest(permittedRequest);
279         givenConfiguredWithMocks(topicName);
280         when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), any()))
281                 .thenReturn(consumer);
282         doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
283         when(createdTopic.getOwner()).thenReturn(Strings.EMPTY);
284
285         //when
286         eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
287
288         //then
289         verify(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
290         verify(dmaaPAuthenticator, never()).authenticate(dMaapContext);
291         verify(permittedRequest, never()).isUserInRole(anyString());
292     }
293
294     @Test
295     public void getEvents_shouldFailDmaapAuthorization_whenTopicOwnerIsSet_andUserHasNoReadPermissionToTopic()
296             throws Exception {
297         //given
298         String topicName = "someSimpleTopicName";
299         String consumerGroup = "CG5";
300         String clientId = "7";
301         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
302         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
303         dMaapContext.setRequest(permittedRequest);
304         givenConfiguredWithMocks(topicName);
305         when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
306                 .thenReturn(consumer);
307         doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
308         when(createdTopic.getOwner()).thenReturn("SimpleTopicOwner");
309         when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
310         doThrow(new AccessDeniedException("userName")).when(createdTopic).checkUserRead(nsaSimpleApiKey);
311
312         thrown.expect(AccessDeniedException.class);
313
314         //when
315         eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
316
317         //then
318         verify(createdTopic).checkUserRead(nsaSimpleApiKey);
319         verify(eventsService, never()).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
320         verify(permittedRequest, never()).isUserInRole(anyString());
321     }
322
323
324     @Test
325     public void getEvents_shouldSuccessfullyRegisterConsumerToEventsStream_withAafAuthorization() throws Exception {
326         //given
327         String topicName = "testTopic";
328         String consumerGroup = "CG2";
329         String clientId = "6";
330         String messageLimit = "10";
331         String timeout = "25";
332         String meta = "yes";
333         String pretty = "on";
334         String cacheEnabled = "false";
335
336         givenConfiguredWithMocks(topicName);
337         givenConfiguredWithProperties(messageLimit, timeout, meta, pretty, cacheEnabled);
338         when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
339                 .thenReturn(consumer);
340         givenUserAuthorizedWithAAF(request, topicName, "sub");
341
342         //when
343         eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
344
345         //then
346         ArgumentCaptor<CambriaOutboundEventStream> osWriter = ArgumentCaptor.forClass(CambriaOutboundEventStream.class);
347         verifyInvocationOrderForSuccessCase(topicName, consumerGroup, clientId, osWriter);
348         assertEventStreamProperties(osWriter.getValue(), messageLimit, timeout);
349     }
350
351     private void assertEventStreamProperties(CambriaOutboundEventStream stream, String messageLimit, String timeout) {
352         assertEquals(Integer.valueOf(messageLimit).intValue(), stream.getfLimit());
353         assertEquals(Integer.valueOf(timeout).intValue(), stream.getfTimeoutMs());
354         assertTrue(stream.isfWithMeta());
355         assertTrue(stream.isfPretty());
356     }
357
358     private void givenConfiguredWithProperties(String messageLimit, String timeout, String meta, String pretty,
359                                                String cacheEnabled) {
360         when(eventsService.getPropertyFromAJSCmap("meta")).thenReturn(meta);
361         when(eventsService.getPropertyFromAJSCmap("pretty")).thenReturn(pretty);
362         when(eventsService.getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache)).thenReturn(cacheEnabled);
363         request.addParameter("timeout", timeout);
364         request.addParameter("limit", messageLimit);
365     }
366
367     private void givenConfiguredWithMocks(String topicName) throws Exception {
368         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
369         when(configurationReader.getfRateLimiter()).thenReturn(limiter);
370         when(configurationReader.getfMetrics()).thenReturn(metrics);
371         when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
372         when(configurationReader.getfConsumerFactory()).thenReturn(factory);
373         when(configurationReader.getfPublisher()).thenReturn(publisher);
374     }
375
376     private void verifyInvocationOrderForSuccessCase(String topicName, String consumerGroup, String clientId,
377                                                      ArgumentCaptor<CambriaOutboundEventStream> osWriter) throws Exception {
378
379         InOrder inOrder = Mockito.inOrder(configurationReader, factory, metrics, limiter, consumer, eventsService);
380         inOrder.verify(configurationReader).getfMetrics();
381         inOrder.verify(configurationReader).getfRateLimiter();
382         inOrder.verify(limiter).onCall(eq(topicName), eq(consumerGroup), eq(clientId), anyString());
383         inOrder.verify(factory).getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
384         inOrder.verify(eventsService).respondOkWithStream(eq(dMaapContext), osWriter.capture());
385         inOrder.verify(consumer).commitOffsets();
386         inOrder.verify(metrics).consumeTick(anyInt());
387         inOrder.verify(limiter).onSend(eq(topicName), eq(consumerGroup), eq(clientId), anyLong());
388         inOrder.verify(consumer).close();
389         inOrder.verifyNoMoreInteractions();
390     }
391
392     @Test
393     public void pushEvents_shouldFail_whenRemoteAddressIsBlacklisted() throws Exception {
394         String remoteIp = "10.132.64.112";
395         request.setRemoteAddr(remoteIp);
396         when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
397         when(blacklist.contains(anyString())).thenReturn(true);
398
399         thrown.expect(CambriaApiException.class);
400         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_FORBIDDEN)));
401
402         eventsService.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00");
403     }
404
405
406
407
408     @Test
409     public void pushEvents_shouldFailDmaapAuthorization_whenTopicOwnerIsSet_andUserHasNoWritePermissionToTopic()
410             throws Exception {
411         //given
412         String topicName = "someSimpleTopicName";
413
414         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
415         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
416         dMaapContext.setRequest(permittedRequest);
417         givenConfiguredWithMocks(topicName);
418         when(createdTopic.getOwner()).thenReturn("SimpleTopicOwner");
419         when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
420         doThrow(new AccessDeniedException("userName")).when(createdTopic).checkUserWrite(nsaSimpleApiKey);
421
422         thrown.expect(AccessDeniedException.class);
423
424         //when
425         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
426
427         //then
428         verify(createdTopic).checkUserWrite(nsaSimpleApiKey);
429         verify(eventsService, never()).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
430         verify(permittedRequest, never()).isUserInRole(anyString());
431     }
432
433     @Test
434     public void pushEvents_shouldFailOnAafAuthorization_whenCadiIsEnabled_topicNameEnforced_andUserHasNoPermission()
435             throws Exception {
436         //given
437         String topicPrefix = "org.onap.aaf.enforced";
438         String topicName = topicPrefix + ".topicName";
439         String permission = "org.onap.dmaap.mr.topic|:topic." + topicName + "|pub";
440         HttpServletRequest deniedRequest = mock(HttpServletRequest.class);
441         when(deniedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
442         when(deniedRequest.isUserInRole(permission)).thenReturn(false);
443         when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
444         when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
445         when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
446         when(eventsService.isCadiEnabled()).thenReturn(true);
447         dMaapContext.setRequest(deniedRequest);
448
449         thrown.expect(DMaaPAccessDeniedException.class);
450         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_UNAUTHORIZED)));
451
452         //when
453         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
454
455         //then
456         verify(deniedRequest).isUserInRole(permission);
457     }
458
459
460     @Test
461     public void pushEvents_shouldPublishMessagesWithoutTransaction() throws Exception {
462         //given
463         String topicName = "topicWithoutTransaction";
464         givenConfiguredWithMocks(topicName);
465         doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
466
467         //when
468         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
469
470         //then
471         verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
472         ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
473         verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
474         assertEquals(1, captor.getValue().getLong("count"));
475     }
476
477     @Test
478     public void pushEvents_shouldHandlePublisherError_whenPushWithoutTransaction() throws Exception {
479         //given
480         String topicName = "topicWithoutTransaction";
481         givenConfiguredWithMocks(topicName);
482         doThrow(new IOException()).when(publisher)
483                 .sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
484
485         thrown.expect(CambriaApiException.class);
486         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
487
488         //when
489         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
490
491         //then
492         verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
493         verify(eventsService, never()).respondOk(any(DMaaPContext.class), any(JSONObject.class));
494     }
495
496
497     @Test
498     public void pushEvents_shouldPublishMessagesWithTransaction() throws Exception {
499         //given
500         String topicPrefix = "org.onap.dmaap.mr";
501         String topicName = topicPrefix + ".topicWithTransaction";
502         givenConfiguredWithMocks(topicName);
503         when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
504         when(eventsService.isCadiEnabled()).thenReturn(true);
505         doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
506
507         request.addUserRole("org.onap.dmaap.mr.topic|:topic." + topicName + "|pub");
508
509         //when
510         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
511
512         //then
513         verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
514         ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
515         verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
516         assertEquals(1, captor.getValue().getLong("count"));
517         assertFalse(captor.getValue().getString("transactionId").isEmpty());
518     }
519
520     @Test
521     public void pushEvents_shouldHandlePublisherError_whenPushWithTransaction() throws Exception {
522         //given
523         String topicPrefix = "org.onap.dmaap.mr";
524         String topicName = topicPrefix + ".topicWithTransaction";
525         givenConfiguredWithMocks(topicName);
526         when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
527         when(eventsService.isCadiEnabled()).thenReturn(true);
528         doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
529         request.addUserRole("org.onap.dmaap.mr.topic|:topic." + topicName + "|pub");
530         doThrow(new IOException()).when(publisher)
531                 .sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
532
533         thrown.expect(CambriaApiException.class);
534         thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
535
536         //when
537         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
538
539         //then
540         verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
541         verify(eventsService, never()).respondOk(any(DMaaPContext.class), any(JSONObject.class));
542     }
543
544     @Test
545     public void pushEvents_shouldNotPerformAnyAuthorization_whenPublishToMetricTopic() throws Exception {
546         //given
547         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
548         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
549         dMaapContext.setRequest(permittedRequest);
550         String metricsTopicName = "msgrtr.apinode.metrics.dmaap";
551         givenConfiguredWithMocks(metricsTopicName);
552         doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
553
554         //when
555         eventsService.pushEvents(dMaapContext, metricsTopicName, iStream, "5", "13:00:00");
556
557         //then
558         ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
559         verify(publisher)
560                 .sendBatchMessageNew(eq(metricsTopicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
561         verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
562         verify(permittedRequest, never()).isUserInRole(anyString());
563         verify(createdTopic, never()).checkUserWrite(any(NsaSimpleApiKey.class));
564         assertEquals(1, captor.getValue().getLong("count"));
565     }
566
567     @Test
568     public void pushEvents_shouldNotPerformAnyAuthorization_whenTopicHasNoOwner() throws Exception {
569         //given
570         HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
571         when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
572         dMaapContext.setRequest(permittedRequest);
573         String topicName = "notEnforcedAafTopic";
574         givenConfiguredWithMocks(topicName);
575         doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
576         when(createdTopic.getOwner()).thenReturn(null);
577
578         //when
579         eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
580
581         //then
582         ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
583         verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
584         verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
585         verify(permittedRequest, never()).isUserInRole(anyString());
586         verify(createdTopic, never()).checkUserWrite(any(NsaSimpleApiKey.class));
587         assertEquals(1, captor.getValue().getLong("count"));
588     }
589
590 }