2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.dmaap.dmf.mr.service.impl;
23 import static org.hamcrest.CoreMatchers.containsString;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Matchers.any;
28 import static org.mockito.Matchers.anyInt;
29 import static org.mockito.Matchers.anyLong;
30 import static org.mockito.Matchers.anyString;
31 import static org.mockito.Matchers.eq;
32 import static org.mockito.Mockito.doNothing;
33 import static org.mockito.Mockito.doReturn;
34 import static org.mockito.Mockito.doThrow;
35 import static org.mockito.Mockito.mock;
36 import static org.mockito.Mockito.never;
37 import static org.mockito.Mockito.verify;
38 import static org.mockito.Mockito.when;
40 import com.att.nsa.limits.Blacklist;
41 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
42 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
43 import java.io.ByteArrayInputStream;
44 import java.io.IOException;
45 import java.io.InputStream;
46 import java.util.ArrayList;
47 import java.util.Collections;
48 import java.util.ConcurrentModificationException;
49 import javax.servlet.http.HttpServletRequest;
50 import joptsimple.internal.Strings;
51 import org.apache.http.HttpStatus;
52 import org.apache.kafka.clients.producer.ProducerRecord;
53 import org.json.JSONObject;
54 import org.junit.Before;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.rules.ExpectedException;
58 import org.junit.runner.RunWith;
59 import org.mockito.ArgumentCaptor;
60 import org.mockito.InOrder;
61 import org.mockito.Mock;
62 import org.mockito.Mockito;
63 import org.mockito.MockitoAnnotations;
64 import org.mockito.Spy;
65 import org.mockito.runners.MockitoJUnitRunner;
66 import org.onap.dmaap.dmf.mr.CambriaApiException;
67 import org.onap.dmaap.dmf.mr.backends.Consumer;
68 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
69 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
70 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
71 import org.onap.dmaap.dmf.mr.backends.Publisher;
72 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
73 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
74 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
75 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
76 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
77 import org.onap.dmaap.dmf.mr.metabroker.Topic;
78 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
79 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator;
80 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
81 import org.springframework.mock.web.MockHttpServletRequest;
82 import org.springframework.mock.web.MockHttpServletResponse;
84 @RunWith(MockitoJUnitRunner.class)
85 public class EventsServiceImplTest {
87 private InputStream iStream = null;
88 private DMaaPContext dMaapContext = new DMaaPContext();
89 private DMaaPErrorMessages pErrorMessages = new DMaaPErrorMessages();
91 private ConfigurationReader configurationReader;
93 private Blacklist blacklist;
95 private DMaaPAuthenticator<NsaSimpleApiKey> dmaaPAuthenticator;
97 private NsaSimpleApiKey nsaSimpleApiKey;
99 private DMaaPKafkaMetaBroker dmaapKafkaMetaBroker;
101 private Topic createdTopic;
103 private ConsumerFactory factory;
105 private Consumer consumer;
107 private Publisher publisher;
109 private DMaaPCambriaLimiter limiter;
111 private MetricsSet metrics;
113 private EventsServiceImpl eventsService;
117 public ExpectedException thrown = ExpectedException.none();
119 private MockHttpServletRequest request;
122 public void setUp() throws Exception {
123 MockitoAnnotations.initMocks(this);
124 String source = "source of my InputStream";
125 iStream = new ByteArrayInputStream(source.getBytes("UTF-8"));
127 request = new MockHttpServletRequest();
128 MockHttpServletResponse response = new MockHttpServletResponse();
129 dMaapContext.setRequest(request);
130 dMaapContext.setResponse(response);
131 when(blacklist.contains(anyString())).thenReturn(false);
132 when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
133 when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
134 dMaapContext.setConfigReader(configurationReader);
135 eventsService.setErrorMessages(pErrorMessages);
136 doReturn("100").when(eventsService).getPropertyFromAJSCmap("timeout");
140 public void getEvents_shouldFailOnAafAuthorization() throws Exception {
141 String topicPrefix = "org.onap.aaf.enforced";
142 String topicName = topicPrefix + ".topicName";
143 when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
144 when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
145 when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
146 when(eventsService.isCadiEnabled()).thenReturn(true);
148 thrown.expect(DMaaPAccessDeniedException.class);
149 thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_UNAUTHORIZED)));
151 eventsService.getEvents(dMaapContext, topicName, "CG1", "23");
155 public void getEvents_shouldFail_whenRemoteAddressIsBlacklisted() throws Exception {
156 String remoteIp = "10.154.17.115";
157 request.setRemoteAddr(remoteIp);
158 when(blacklist.contains(remoteIp)).thenReturn(true);
159 when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
161 thrown.expect(CambriaApiException.class);
162 thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_FORBIDDEN)));
164 eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
168 public void getEvents_shouldFail_whenRequestedTopicNotExists() throws Exception {
169 when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
170 when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(null);
172 thrown.expect(CambriaApiException.class);
173 thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
175 eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
179 public void getEvents_shouldFail_whenConsumerLockCannotBeAcquired() throws Exception {
181 String topicName = "testTopic345";
182 String consumerGroup = "CG5";
183 String clientId = "13";
184 when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
185 when(configurationReader.getfRateLimiter()).thenReturn(limiter);
186 when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
187 when(configurationReader.getfConsumerFactory()).thenReturn(factory);
188 when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
189 doThrow(new UnavailableException("Could not acquire consumer lock")).when(factory)
190 .getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
192 thrown.expect(CambriaApiException.class);
193 thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)));
196 eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
199 verify(factory).getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
204 public void getEvents_shouldFail_whenBrokerServicesAreUnavailable() throws Exception {
205 String topicName = "testTopic";
206 String consumerGroup = "CG1";
207 String clientId = "23";
208 when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(null);
209 when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
210 when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
211 when(configurationReader.getfConsumerFactory()).thenReturn(factory);
213 givenUserAuthorizedWithAAF(request, topicName, "sub");
215 thrown.expect(CambriaApiException.class);
216 thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)));
219 eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
222 verify(factory).destroyConsumer(topicName, consumerGroup, clientId);
225 private void givenUserAuthorizedWithAAF(MockHttpServletRequest request, String topicName, String operation) {
226 String permission = "org.onap.dmaap.mr.topic|:topic." + topicName + "|" + operation;
227 request.addUserRole(permission);
231 public void getEvents_shouldHandleConcurrentModificationError() throws Exception {
232 String testTopic = "testTopic";
233 when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
234 when(dmaapKafkaMetaBroker.getTopic(testTopic)).thenReturn(createdTopic);
235 when(configurationReader.getfConsumerFactory()).thenReturn(factory);
236 when(configurationReader.getfRateLimiter()).thenThrow(new ConcurrentModificationException("Error occurred"));
237 givenUserAuthorizedWithAAF(request, testTopic, "sub");
239 thrown.expect(CambriaApiException.class);
240 thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_CONFLICT)));
242 eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
246 public void getEvents_shouldNotAuthorizeClient_whenSubscribingToMetricsTopic() throws Exception {
248 HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
249 when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
250 dMaapContext.setRequest(permittedRequest);
251 String metricsTopicName = "msgrtr.apinode.metrics.dmaap";
252 String consumerGroup = "CG5";
253 String clientId = "7";
254 givenConfiguredWithMocks(metricsTopicName);
255 when(factory.getConsumerFor(eq(metricsTopicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
256 .thenReturn(consumer);
257 doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
260 eventsService.getEvents(dMaapContext, metricsTopicName, consumerGroup, clientId);
263 verify(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
264 verify(dmaaPAuthenticator, never()).authenticate(dMaapContext);
265 verify(permittedRequest, never()).isUserInRole(anyString());
269 public void getEvents_shouldNotAuthorizeClient_whenTopicNoteEnforcedWithAaf_andTopicHasNoOwnerSet()
272 String topicName = "someSimpleTopicName";
273 String consumerGroup = "CG5";
274 String clientId = "7";
275 HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
276 when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
277 dMaapContext.setRequest(permittedRequest);
278 givenConfiguredWithMocks(topicName);
279 when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
280 .thenReturn(consumer);
281 doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
282 when(createdTopic.getOwner()).thenReturn(Strings.EMPTY);
285 eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
288 verify(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
289 verify(dmaaPAuthenticator, never()).authenticate(dMaapContext);
290 verify(permittedRequest, never()).isUserInRole(anyString());
294 public void getEvents_shouldFailDmaapAuthorization_whenTopicOwnerIsSet_andUserHasNoReadPermissionToTopic()
297 String topicName = "someSimpleTopicName";
298 String consumerGroup = "CG5";
299 String clientId = "7";
300 HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
301 when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
302 dMaapContext.setRequest(permittedRequest);
303 givenConfiguredWithMocks(topicName);
304 when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
305 .thenReturn(consumer);
306 doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
307 when(createdTopic.getOwner()).thenReturn("SimpleTopicOwner");
308 when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
309 doThrow(new AccessDeniedException("userName")).when(createdTopic).checkUserRead(nsaSimpleApiKey);
311 thrown.expect(AccessDeniedException.class);
314 eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
317 verify(createdTopic).checkUserRead(nsaSimpleApiKey);
318 verify(eventsService, never()).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
319 verify(permittedRequest, never()).isUserInRole(anyString());
324 public void getEvents_shouldSuccessfullyRegisterConsumerToEventsStream_withAafAuthorization() throws Exception {
326 String topicName = "testTopic";
327 String consumerGroup = "CG2";
328 String clientId = "6";
329 String messageLimit = "10";
330 String timeout = "25";
332 String pretty = "on";
333 String cacheEnabled = "false";
335 givenConfiguredWithMocks(topicName);
336 givenConfiguredWithProperties(messageLimit, timeout, meta, pretty, cacheEnabled);
337 when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
338 .thenReturn(consumer);
339 givenUserAuthorizedWithAAF(request, topicName, "sub");
342 eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
345 ArgumentCaptor<CambriaOutboundEventStream> osWriter = ArgumentCaptor.forClass(CambriaOutboundEventStream.class);
346 verifyInvocationOrderForSuccessCase(topicName, consumerGroup, clientId, osWriter);
347 assertEventStreamProperties(osWriter.getValue(), messageLimit, timeout);
350 private void assertEventStreamProperties(CambriaOutboundEventStream stream, String messageLimit, String timeout) {
351 assertEquals(Integer.valueOf(messageLimit).intValue(), stream.getfLimit());
352 assertEquals(Integer.valueOf(timeout).intValue(), stream.getfTimeoutMs());
353 assertTrue(stream.isfWithMeta());
354 assertTrue(stream.isfPretty());
357 private void givenConfiguredWithProperties(String messageLimit, String timeout, String meta, String pretty,
358 String cacheEnabled) {
359 when(eventsService.getPropertyFromAJSCmap("meta")).thenReturn(meta);
360 when(eventsService.getPropertyFromAJSCmap("pretty")).thenReturn(pretty);
361 when(eventsService.getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache)).thenReturn(cacheEnabled);
362 request.addParameter("timeout", timeout);
363 request.addParameter("limit", messageLimit);
366 private void givenConfiguredWithMocks(String topicName) throws Exception {
367 when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
368 when(configurationReader.getfRateLimiter()).thenReturn(limiter);
369 when(configurationReader.getfMetrics()).thenReturn(metrics);
370 when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
371 when(configurationReader.getfConsumerFactory()).thenReturn(factory);
372 when(configurationReader.getfPublisher()).thenReturn(publisher);
375 private void verifyInvocationOrderForSuccessCase(String topicName, String consumerGroup, String clientId,
376 ArgumentCaptor<CambriaOutboundEventStream> osWriter) throws Exception {
378 InOrder inOrder = Mockito.inOrder(configurationReader, factory, metrics, limiter, consumer, eventsService);
379 inOrder.verify(configurationReader).getfMetrics();
380 inOrder.verify(configurationReader).getfRateLimiter();
381 inOrder.verify(limiter).onCall(eq(topicName), eq(consumerGroup), eq(clientId), anyString());
382 inOrder.verify(factory).getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
383 inOrder.verify(eventsService).respondOkWithStream(eq(dMaapContext), osWriter.capture());
384 inOrder.verify(consumer).commitOffsets();
385 inOrder.verify(metrics).consumeTick(anyInt());
386 inOrder.verify(limiter).onSend(eq(topicName), eq(consumerGroup), eq(clientId), anyLong());
387 inOrder.verify(consumer).close();
388 inOrder.verifyNoMoreInteractions();
392 public void pushEvents_shouldFail_whenRemoteAddressIsBlacklisted() throws Exception {
393 String remoteIp = "10.132.64.112";
394 request.setRemoteAddr(remoteIp);
395 when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
396 when(blacklist.contains(anyString())).thenReturn(true);
398 thrown.expect(CambriaApiException.class);
399 thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_FORBIDDEN)));
401 eventsService.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00");
406 public void pushEvents_shouldFail_whenRequestedTopicDoesNotExist() throws Exception {
407 when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
408 when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(null);
410 thrown.expect(CambriaApiException.class);
411 thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
413 eventsService.pushEvents(dMaapContext, "testTopic", iStream, "5", "13:00:00");
417 public void pushEvents_shouldFailDmaapAuthorization_whenTopicOwnerIsSet_andUserHasNoWritePermissionToTopic()
420 String topicName = "someSimpleTopicName";
422 HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
423 when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
424 dMaapContext.setRequest(permittedRequest);
425 givenConfiguredWithMocks(topicName);
426 when(createdTopic.getOwner()).thenReturn("SimpleTopicOwner");
427 when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
428 doThrow(new AccessDeniedException("userName")).when(createdTopic).checkUserWrite(nsaSimpleApiKey);
430 thrown.expect(AccessDeniedException.class);
433 eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
436 verify(createdTopic).checkUserWrite(nsaSimpleApiKey);
437 verify(eventsService, never()).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
438 verify(permittedRequest, never()).isUserInRole(anyString());
442 public void pushEvents_shouldFailOnAafAuthorization_whenCadiIsEnabled_topicNameEnforced_andUserHasNoPermission()
445 String topicPrefix = "org.onap.aaf.enforced";
446 String topicName = topicPrefix + ".topicName";
447 String permission = "org.onap.dmaap.mr.topic|:topic." + topicName + "|pub";
448 HttpServletRequest deniedRequest = mock(HttpServletRequest.class);
449 when(deniedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
450 when(deniedRequest.isUserInRole(permission)).thenReturn(false);
451 when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
452 when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
453 when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
454 when(eventsService.isCadiEnabled()).thenReturn(true);
455 dMaapContext.setRequest(deniedRequest);
457 thrown.expect(DMaaPAccessDeniedException.class);
458 thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_UNAUTHORIZED)));
461 eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
464 verify(deniedRequest).isUserInRole(permission);
469 public void pushEvents_shouldPublishMessagesWithoutTransaction() throws Exception {
471 String topicName = "topicWithoutTransaction";
472 givenConfiguredWithMocks(topicName);
473 doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
476 eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
479 verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
480 ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
481 verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
482 assertEquals(1, captor.getValue().getLong("count"));
486 public void pushEvents_shouldHandlePublisherError_whenPushWithoutTransaction() throws Exception {
488 String topicName = "topicWithoutTransaction";
489 givenConfiguredWithMocks(topicName);
490 doThrow(new IOException()).when(publisher)
491 .sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
493 thrown.expect(CambriaApiException.class);
494 thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
497 eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
500 verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
501 verify(eventsService, never()).respondOk(any(DMaaPContext.class), any(JSONObject.class));
506 public void pushEvents_shouldPublishMessagesWithTransaction() throws Exception {
508 String topicPrefix = "org.onap.dmaap.mr";
509 String topicName = topicPrefix + ".topicWithTransaction";
510 givenConfiguredWithMocks(topicName);
511 when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
512 when(eventsService.isCadiEnabled()).thenReturn(true);
513 doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
515 request.addUserRole("org.onap.dmaap.mr.topic|:topic." + topicName + "|pub");
518 eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
521 verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
522 ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
523 verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
524 assertEquals(1, captor.getValue().getLong("count"));
525 assertFalse(captor.getValue().getString("transactionId").isEmpty());
529 public void pushEvents_shouldHandlePublisherError_whenPushWithTransaction() throws Exception {
531 String topicPrefix = "org.onap.dmaap.mr";
532 String topicName = topicPrefix + ".topicWithTransaction";
533 givenConfiguredWithMocks(topicName);
534 when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
535 when(eventsService.isCadiEnabled()).thenReturn(true);
536 doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
537 request.addUserRole("org.onap.dmaap.mr.topic|:topic." + topicName + "|pub");
538 doThrow(new IOException()).when(publisher)
539 .sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
541 thrown.expect(CambriaApiException.class);
542 thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
545 eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
548 verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
549 verify(eventsService, never()).respondOk(any(DMaaPContext.class), any(JSONObject.class));
553 public void pushEvents_shouldNotPerformAnyAuthorization_whenPublishToMetricTopic() throws Exception {
555 HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
556 when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
557 dMaapContext.setRequest(permittedRequest);
558 String metricsTopicName = "msgrtr.apinode.metrics.dmaap";
559 givenConfiguredWithMocks(metricsTopicName);
560 doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
563 eventsService.pushEvents(dMaapContext, metricsTopicName, iStream, "5", "13:00:00");
566 ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
568 .sendBatchMessageNew(eq(metricsTopicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
569 verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
570 verify(permittedRequest, never()).isUserInRole(anyString());
571 verify(createdTopic, never()).checkUserWrite(any(NsaSimpleApiKey.class));
572 assertEquals(1, captor.getValue().getLong("count"));
576 public void pushEvents_shouldNotPerformAnyAuthorization_whenTopicHasNoOwner() throws Exception {
578 HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
579 when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
580 dMaapContext.setRequest(permittedRequest);
581 String topicName = "notEnforcedAafTopic";
582 givenConfiguredWithMocks(topicName);
583 doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
584 when(createdTopic.getOwner()).thenReturn(null);
587 eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
590 ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
591 verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
592 verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
593 verify(permittedRequest, never()).isUserInRole(anyString());
594 verify(createdTopic, never()).checkUserWrite(any(NsaSimpleApiKey.class));
595 assertEquals(1, captor.getValue().getLong("count"));