+ private static final String TOPIC_FQTN = "topic_1";
+ private static final String GLOBAL_MR_HOST = "global.mr.host";
+ private TopicService topicService;
+ @Mock
+ private MR_ClientService clientService;
+ @Mock
+ private DmaapConfig dmaapConfig;
+ @Mock
+ private MR_ClusterService clusters;
+ @Mock
+ private DcaeLocationService locations;
+ @Mock
+ private MirrorMakerService bridge;
+ @Mock
+ private AafTopicSetupService aafTopicSetupService;
+
+ @Before
+ public void setUp() throws Exception {
+ given(dmaapConfig.getProperty("MR.globalHost", "global.host.not.set")).willReturn(GLOBAL_MR_HOST);
+ given(aafTopicSetupService.aafTopicSetup(any(Topic.class))).willReturn(new ApiError(200, "OK"));
+ createTopicService();
+ }
+
+ @Test
+ public void getTopics_shouldReturnTopicsReceivedDuringServiceCreation() {
+
+ ImmutableMap<String, Topic> topics = ImmutableMap.of(TOPIC_FQTN, new Topic());
+ topicService = new TopicService(topics, clientService, dmaapConfig, clusters, locations, bridge, aafTopicSetupService);
+
+ assertEquals(topics, topicService.getTopics());
+ }
+
+ @Test
+ public void getAllTopics_shouldReturnTopicsWithClients() {
+
+ ArrayList<MR_Client> mrClients = newArrayList(new MR_Client());
+ given(clientService.getAllMrClients(TOPIC_FQTN)).willReturn(mrClients);
+
+ List<Topic> allTopics = topicService.getAllTopics();
+
+ assertThat(getOnlyElement(allTopics), hasCorrectFqtn(TOPIC_FQTN));
+ assertEquals(mrClients, getOnlyElement(allTopics).getClients());
+ }
+
+ @Test
+ public void getAllTopicsWithoutClients_shouldReturnNoClients() {
+
+ List<Topic> allTopics = topicService.getAllTopicsWithoutClients();
+
+ assertThat(getOnlyElement(allTopics), hasCorrectFqtn(TOPIC_FQTN));
+ assertNull(getOnlyElement(allTopics).getClients());
+ verifyZeroInteractions(clientService);
+ }
+
+ @Test
+ public void getAllTopics_shouldCacheClients() {
+
+ ArrayList<MR_Client> mrClients = newArrayList(new MR_Client());
+ given(clientService.getAllMrClients(TOPIC_FQTN)).willReturn(mrClients);
+
+ topicService.getAllTopics();
+ List<Topic> allTopics = topicService.getAllTopicsWithoutClients();
+
+ assertThat(getOnlyElement(allTopics), hasCorrectFqtn(TOPIC_FQTN));
+ assertEquals(mrClients, getOnlyElement(allTopics).getClients());
+ }
+
+ @Test
+ public void getTopic_shouldReturnTopicByFqtn() {
+
+ ApiError apiError = new ApiError();
+ Topic topic = topicService.getTopic(TOPIC_FQTN, apiError);
+
+ assertThat(topic, hasCorrectFqtn(TOPIC_FQTN));
+ assertEquals(Response.Status.OK.getStatusCode(), apiError.getCode());
+ }
+
+ @Test
+ public void getTopic_shouldReturnTopicWithMrClients() {
+
+ ArrayList<MR_Client> mrClients = newArrayList(new MR_Client());
+ given(clientService.getAllMrClients(TOPIC_FQTN)).willReturn(mrClients);
+
+ Topic topic = topicService.getTopic(TOPIC_FQTN, new ApiError());
+
+ assertThat(topic, hasCorrectFqtn(TOPIC_FQTN));
+ assertEquals(mrClients, topic.getClients());
+ }
+
+ @Test
+ public void getTopic_shouldReturnError() {
+
+ ApiError apiError = new ApiError();
+ Topic topic = topicService.getTopic("not_existing", apiError);
+
+ assertNull(topic);
+ assertEquals(Response.Status.NOT_FOUND.getStatusCode(), apiError.getCode());
+ }
+
+ @Test
+ public void addTopic_shouldAddNewTopic() {
+ Topic newTopic = createTopic("");
+
+ ApiError apiError = new ApiError();
+ Topic addedTopic = topicService.addTopic(newTopic, apiError, true);
+
+ assertSame(newTopic, addedTopic);
+ assertEquals(Response.Status.OK.getStatusCode(), apiError.getCode());
+ assertNotNull(topicService.getTopic(addedTopic.getFqtn(), new ApiError()));
+ }
+
+ @Test
+ public void addTopic_shouldReturnErrorWhenTopicAlreadyExists() {
+ Topic newTopic = createTopic("");