2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2019 Nokia Intellectual Property. All rights reserved.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.dmaap.dmf.mr.service.impl;
25 import static org.junit.Assert.assertEquals;
26 import static org.mockito.Matchers.any;
27 import static org.mockito.Matchers.anyBoolean;
28 import static org.mockito.Matchers.anyInt;
29 import static org.mockito.Matchers.anyString;
30 import static org.mockito.Matchers.contains;
31 import static org.mockito.Matchers.eq;
32 import static org.mockito.Mockito.doNothing;
33 import static org.mockito.Mockito.doReturn;
34 import static org.mockito.Mockito.never;
35 import static org.mockito.Mockito.verify;
36 import static org.mockito.Mockito.verifyZeroInteractions;
37 import static org.mockito.Mockito.when;
39 import com.att.nsa.configs.ConfigDbException;
40 import com.att.nsa.security.NsaAcl;
41 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
42 import com.att.nsa.security.db.simple.NsaSimpleApiKey;
43 import java.io.IOException;
44 import java.nio.file.attribute.UserPrincipal;
45 import java.security.Principal;
46 import java.util.Arrays;
47 import java.util.HashSet;
48 import javax.servlet.ServletOutputStream;
49 import javax.servlet.http.HttpServletRequest;
50 import javax.servlet.http.HttpServletResponse;
51 import org.json.JSONException;
52 import org.json.JSONObject;
53 import org.junit.Assert;
54 import org.junit.Before;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.rules.ExpectedException;
58 import org.junit.runner.RunWith;
59 import org.mockito.Mock;
60 import org.mockito.Spy;
61 import org.mockito.runners.MockitoJUnitRunner;
62 import org.onap.dmaap.dmf.mr.CambriaApiException;
63 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
64 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
65 import org.onap.dmaap.dmf.mr.beans.TopicBean;
66 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
67 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
68 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
69 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
70 import org.onap.dmaap.dmf.mr.metabroker.Topic;
71 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator;
72 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
75 @RunWith(MockitoJUnitRunner.class)
76 public class TopicServiceImplTest {
78 private static final String TOPIC_CREATE_PEM = "org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:org.onap.dmaap.mr|create";
79 private static final String TOPIC_DELETE_PEM = "org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:org.onap.dmaap.mr|destroy";
80 private NsaSimpleApiKey user = new NsaSimpleApiKey("admin", "password");
81 private TopicBean topicBean;
84 private TopicServiceImpl topicService;
87 private DMaaPErrorMessages errorMessages;
90 private DMaaPContext dmaapContext;
93 private ConfigurationReader configReader;
96 private ServletOutputStream oStream;
99 private DMaaPAuthenticator<NsaSimpleApiKey> dmaaPAuthenticator;
102 private HttpServletRequest httpServReq;
105 private HttpServletResponse httpServRes;
108 private DMaaPKafkaMetaBroker dmaapKafkaMetaBroker;
111 private Topic createdTopic;
114 private NsaAcl nsaAcl;
117 public ExpectedException thrown = ExpectedException.none();
120 public void setUp() throws Exception {
121 configureSpyInstance();
122 topicService.setErrorMessages(errorMessages);
124 when(dmaapContext.getRequest()).thenReturn(httpServReq);
127 private void configureSpyInstance() throws Exception {
128 doReturn(user).when(topicService).getDmaapAuthenticatedUser(any(DMaaPContext.class));
129 doReturn(dmaapKafkaMetaBroker).when(topicService).getMetaBroker(any(DMaaPContext.class));
130 doNothing().when(topicService).respondOk(any(DMaaPContext.class),anyString());
131 doNothing().when(topicService).respondOk(any(DMaaPContext.class),any(JSONObject.class));
132 when(topicService.getPropertyFromAJSCbean("enforced.topic.name.AAF"))
133 .thenReturn("org.onap.dmaap.mr");
134 when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf"))
135 .thenReturn("org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:");
138 private void givenTopicBean(String topicName) {
139 topicBean = new TopicBean();
140 topicBean.setTopicName(topicName);
145 public void createTopic_shouldSkipAAFAuthorization_whenCadiIsEnabled_andTopicNameNotEnforced() throws Exception {
147 String topicName = "UNAUTHENTICATED.PRH.REGISTRATION";
149 when(dmaapKafkaMetaBroker.createTopic(eq(topicName), any(), anyString(), anyInt(), anyInt(), anyBoolean()))
150 .thenReturn(createdTopic);
152 givenTopicBean(topicName);
155 topicService.createTopic(dmaapContext, topicBean);
158 verify(dmaapKafkaMetaBroker).createTopic(eq(topicName), any(), anyString(), anyInt(), anyInt(),
160 verify(topicService).respondOk(eq(dmaapContext), any(JSONObject.class));
161 verify(httpServReq, never()).isUserInRole(TOPIC_CREATE_PEM);
165 public void createTopic_shouldSkipAAFAuthorization_whenCADIdisabled() throws Exception {
167 String topicName = "org.onap.dmaap.mr.topic-2";
168 givenTopicBean(topicName);
170 when(dmaapKafkaMetaBroker.createTopic(eq(topicName), any(), anyString(), anyInt(), anyInt(), anyBoolean()))
171 .thenReturn(createdTopic);
174 topicService.createTopic(dmaapContext, topicBean);
177 verify(dmaapKafkaMetaBroker).createTopic(eq(topicName), any(), anyString(), anyInt(), anyInt(),
179 verify(topicService).respondOk(eq(dmaapContext), any(JSONObject.class));
180 verify(httpServReq, never()).isUserInRole(TOPIC_CREATE_PEM);
184 public void createTopic_shouldPass_whenCADIisDisabled_andNoUserInDmaapContext() throws Exception {
186 String topicName = "org.onap.dmaap.mr.topic-3";
187 givenTopicBean(topicName);
189 doReturn(null).when(topicService).getDmaapAuthenticatedUser(dmaapContext);
190 when(dmaapKafkaMetaBroker.createTopic(eq(topicName), any(), anyString(), anyInt(), anyInt(), anyBoolean()))
191 .thenReturn(createdTopic);
194 topicService.createTopic(dmaapContext, topicBean);
197 verify(dmaapKafkaMetaBroker).createTopic(eq(topicName), any(), anyString(), anyInt(), anyInt(),
199 verify(topicService).respondOk(eq(dmaapContext), any(JSONObject.class));
203 public void createTopic_shouldPassWithAAFauthorization_whenCadiIsEnabled_andTopicNameWithEnforcedPrefix() throws Exception {
205 String topicName = "org.onap.dmaap.mr.topic-4";
206 givenTopicBean(topicName);
208 Principal user = new UserPrincipal(){
210 public String getName(){
214 when(topicService.isCadiEnabled()).thenReturn(true);
215 when(httpServReq.isUserInRole(TOPIC_CREATE_PEM)).thenReturn(true);
216 when(httpServReq.getUserPrincipal()).thenReturn(user);
217 when(dmaapKafkaMetaBroker.createTopic(eq(topicName), any(), eq("user"), anyInt(), anyInt(), anyBoolean()))
218 .thenReturn(createdTopic);
221 topicService.createTopic(dmaapContext, topicBean);
224 verify(httpServReq).isUserInRole(TOPIC_CREATE_PEM);
225 verify(dmaapKafkaMetaBroker).createTopic(eq(topicName), any(), eq("user"), anyInt(), anyInt(), anyBoolean());
226 verify(topicService).respondOk(eq(dmaapContext), any(JSONObject.class));
227 verify(topicService, never()).getDmaapAuthenticatedUser(dmaapContext);
231 public void createTopic_shouldFailWithAAFauthorization_whenCadiIsEnabled_andTopicNameWithEnforcedPrefix() throws Exception {
233 thrown.expect(DMaaPAccessDeniedException.class);
235 String topicName = "org.onap.dmaap.mr.topic-5";
236 givenTopicBean(topicName);
238 Principal user = new Principal(){
240 public String getName(){
244 when(topicService.isCadiEnabled()).thenReturn(true);
245 when(httpServReq.isUserInRole(TOPIC_CREATE_PEM)).thenReturn(false);
246 when(httpServReq.getUserPrincipal()).thenReturn(user);
249 topicService.createTopic(dmaapContext, topicBean);
252 verify(httpServReq).isUserInRole(TOPIC_CREATE_PEM);
253 verify(topicService, never()).getDmaapAuthenticatedUser(dmaapContext);
254 verifyZeroInteractions(dmaapKafkaMetaBroker);
255 verifyZeroInteractions(createdTopic);
259 public void createTopic_shouldThrowApiException_whenBrokerThrowsConfigDbException() throws Exception {
261 thrown.expect(CambriaApiException.class);
263 String topicName = "org.onap.dmaap.mr.topic-6";
264 givenTopicBean(topicName);
266 when(dmaapKafkaMetaBroker.createTopic(eq(topicName), any(), any(), anyInt(), anyInt(), anyBoolean()))
267 .thenThrow(new ConfigDbException("fail"));
270 topicService.createTopic(dmaapContext, topicBean);
273 verifyZeroInteractions(createdTopic);
277 public void createTopic_shouldFailGracefully_whenTopicExistsExceptionOccurs() throws Exception {
279 String topicName = "org.onap.dmaap.mr.topic-7";
280 givenTopicBean(topicName);
282 when(dmaapKafkaMetaBroker.createTopic(eq(topicName), any(), anyString(), anyInt(), anyInt(), anyBoolean()))
283 .thenThrow(new Broker1.TopicExistsException("enfTopicNamePlusExtra"));
286 topicService.createTopic(dmaapContext, topicBean);
289 verifyZeroInteractions(createdTopic);
293 public void getIntValueOrDefault_shouldReturnOne_whenDefaultNotProvided() {
295 String defaultPropertyName = "propertyName";
296 when(topicService.getPropertyFromAJSCmap(defaultPropertyName)).thenReturn("");
299 int extracted = topicService.getIntValueOrDefault(defaultPropertyName);
302 assertEquals(1, extracted);
306 public void getIntValueOrDefault_shouldReturnOne_whenValueIsNegative() {
308 String defaultPropertyName = "propertyName";
309 when(topicService.getPropertyFromAJSCmap(defaultPropertyName)).thenReturn("-1");
312 int extracted = topicService.getIntValueOrDefault(defaultPropertyName);
315 assertEquals(1, extracted);
319 public void getIntValueOrDefault_shouldParseDefaultAndReturnIt_whenGivenValueIsPositive() {
321 String defaultPropertyName = "propertyName";
322 when(topicService.getPropertyFromAJSCmap(defaultPropertyName)).thenReturn("3");
325 int extracted = topicService.getIntValueOrDefault(defaultPropertyName);
328 assertEquals(3, extracted);
331 @Test(expected = TopicExistsException.class)
332 public void testGetTopics_null_topic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
333 TopicExistsException, JSONException, ConfigDbException {
335 Assert.assertNotNull(topicService);
336 when(dmaapKafkaMetaBroker.getTopic(anyString())).thenReturn(null);
338 topicService.getTopic(dmaapContext, "topicName");
342 public void testGetTopics_NonNull_topic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
343 TopicExistsException, JSONException, ConfigDbException {
345 Assert.assertNotNull(topicService);
347 when(dmaapKafkaMetaBroker.getTopic(anyString())).thenReturn(createdTopic);
349 when(createdTopic.getName()).thenReturn("topicName");
350 when(createdTopic.getDescription()).thenReturn("topicDescription");
351 when(createdTopic.getOwners()).thenReturn(new HashSet<>(Arrays.asList("user1,user2".split(","))));
353 when(createdTopic.getReaderAcl()).thenReturn(nsaAcl);
354 when(createdTopic.getWriterAcl()).thenReturn(nsaAcl);
356 topicService.getTopic(dmaapContext, "topicName");
359 @Test(expected = TopicExistsException.class)
360 public void testGetPublishersByTopicName_nullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
361 IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
363 Assert.assertNotNull(topicService);
365 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(null);
367 topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
372 public void testGetPublishersByTopicName_nonNullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
373 IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
375 Assert.assertNotNull(topicService);
377 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(createdTopic);
378 when(createdTopic.getWriterAcl()).thenReturn(nsaAcl);
379 topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
382 @Test(expected = TopicExistsException.class)
383 public void testGetConsumersByTopicName_nullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
384 IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
386 Assert.assertNotNull(topicService);
388 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(null);
390 topicService.getConsumersByTopicName(dmaapContext, "topicNamespace.name");
395 public void testGetConsumersByTopicName_nonNullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
396 IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
398 Assert.assertNotNull(topicService);
400 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(createdTopic);
402 when(createdTopic.getReaderAcl()).thenReturn(nsaAcl);
404 topicService.getConsumersByTopicName(dmaapContext, "topicNamespace.name");
408 public void testGetPublishersByTopicName() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
409 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
411 Assert.assertNotNull(topicService);
413 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(createdTopic);
415 topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
418 @Test(expected = TopicExistsException.class)
419 public void testGetPublishersByTopicNameError() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
420 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
422 Assert.assertNotNull(topicService);
424 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(null);
426 topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
430 public void deleteTopic_shouldDeleteTopic_whenUserAuthorizedWithAAF_andTopicExists() throws Exception {
432 String topicName = "org.onap.dmaap.mr.topic-9";
433 when(topicService.isCadiEnabled()).thenReturn(true);
434 when(httpServReq.isUserInRole(TOPIC_DELETE_PEM)).thenReturn(true);
435 when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
438 topicService.deleteTopic(dmaapContext, topicName);
441 verify(httpServReq).isUserInRole(TOPIC_DELETE_PEM);
442 verify(topicService).respondOk(eq(dmaapContext), contains(topicName));
443 verify(topicService, never()).getDmaapAuthenticatedUser(dmaapContext);
447 public void deleteTopic_shouldSkipAAFauthorization_whenTopicNameNotEnforced() throws Exception {
449 String topicName = "UNAUTHENTICATED.PRH.READY";
450 when(topicService.isCadiEnabled()).thenReturn(true);
451 when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
454 topicService.deleteTopic(dmaapContext, topicName);
457 verify(httpServReq, never()).isUserInRole(TOPIC_DELETE_PEM);
458 verify(topicService).respondOk(eq(dmaapContext), contains(topicName));
462 public void deleteTopic_shouldDeleteTopic_whenUserAuthorizedInContext_andTopicExists() throws Exception {
464 String topicName = "org.onap.dmaap.mr.topic-10";
465 when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
468 topicService.deleteTopic(dmaapContext, topicName);
471 verify(httpServReq, never()).isUserInRole(TOPIC_DELETE_PEM);
472 verify(topicService).respondOk(eq(dmaapContext), contains(topicName));
476 public void deleteTopic_shouldNotDeleteTopic_whenUserNotAuthorizedByAAF() throws Exception {
478 String topicName = "org.onap.dmaap.mr.topic-10";
479 thrown.expect(DMaaPAccessDeniedException.class);
481 when(topicService.isCadiEnabled()).thenReturn(true);
482 when(httpServReq.isUserInRole(TOPIC_DELETE_PEM)).thenReturn(false);
485 topicService.deleteTopic(dmaapContext, topicName);
488 verify(httpServReq).isUserInRole(TOPIC_DELETE_PEM);
489 verify(topicService, never()).respondOk(eq(dmaapContext), anyString());
490 verify(topicService, never()).getDmaapAuthenticatedUser(dmaapContext);
494 public void deleteTopic_shouldNotDeleteTopic_whenTopicDoesNotExist() throws Exception {
496 String topicName = "org.onap.dmaap.mr.topic-10";
497 thrown.expect(TopicExistsException.class);
499 when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(null);
502 topicService.deleteTopic(dmaapContext, topicName);
505 verify(topicService, never()).respondOk(eq(dmaapContext), anyString());
509 public void testPermitConsumerForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
510 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
512 Assert.assertNotNull(topicService);
513 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
514 TopicBean topicBean = new TopicBean();
515 topicBean.setTopicName("enfTopicNamePlusExtra");
517 topicService.permitConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
520 @Test(expected = TopicExistsException.class)
521 public void testPermitConsumerForTopic_nulltopic()
522 throws DMaaPAccessDeniedException, CambriaApiException, IOException,
523 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
525 Assert.assertNotNull(topicService);
526 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
527 TopicBean topicBean = new TopicBean();
528 topicBean.setTopicName("enfTopicNamePlusExtra");
530 topicService.permitConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
534 public void testdenyConsumerForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
535 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
537 Assert.assertNotNull(topicService);
538 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
539 TopicBean topicBean = new TopicBean();
540 topicBean.setTopicName("enfTopicNamePlusExtra");
542 topicService.denyConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
545 @Test(expected = TopicExistsException.class)
546 public void testdenyConsumerForTopic_nulltopic()
547 throws DMaaPAccessDeniedException, CambriaApiException, IOException,
548 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
550 Assert.assertNotNull(topicService);
552 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
553 TopicBean topicBean = new TopicBean();
554 topicBean.setTopicName("enfTopicNamePlusExtra");
556 topicService.denyConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
561 public void testPermitPublisherForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
562 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
564 Assert.assertNotNull(topicService);
566 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
567 TopicBean topicBean = new TopicBean();
568 topicBean.setTopicName("enfTopicNamePlusExtra");
570 topicService.permitPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");
573 @Test(expected = TopicExistsException.class)
574 public void testPermitPublisherForTopic_nulltopic()
575 throws DMaaPAccessDeniedException, CambriaApiException, IOException,
576 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
578 Assert.assertNotNull(topicService);
580 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
581 TopicBean topicBean = new TopicBean();
582 topicBean.setTopicName("enfTopicNamePlusExtra");
584 topicService.permitPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");
588 public void testDenyPublisherForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
589 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
591 Assert.assertNotNull(topicService);
593 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
594 TopicBean topicBean = new TopicBean();
595 topicBean.setTopicName("enfTopicNamePlusExtra");
597 topicService.denyPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");
601 @Test(expected = TopicExistsException.class)
602 public void testDenyPublisherForTopic_nulltopic()
603 throws DMaaPAccessDeniedException, CambriaApiException, IOException,
604 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
606 Assert.assertNotNull(topicService);
608 when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
609 TopicBean topicBean = new TopicBean();
610 topicBean.setTopicName("enfTopicNamePlusExtra");
612 topicService.denyPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");
617 public void testGetAllTopics() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
618 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
620 Assert.assertNotNull(topicService);
622 TopicBean topicBean = new TopicBean();
623 topicBean.setTopicName("enfTopicNamePlusExtra");
625 topicService.getAllTopics(dmaapContext);
629 public void testGetTopics() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
630 TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
632 Assert.assertNotNull(topicService);
634 TopicBean topicBean = new TopicBean();
635 topicBean.setTopicName("enfTopicNamePlusExtra");
637 topicService.getTopics(dmaapContext);