2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.datalake.feeder.service;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.Mockito.doThrow;
29 import static org.mockito.Mockito.when;
31 import java.io.IOException;
34 import org.elasticsearch.client.IndicesClient;
35 import org.elasticsearch.client.RequestOptions;
36 import org.elasticsearch.client.RestHighLevelClient;
37 import org.elasticsearch.client.indices.GetIndexRequest;
38 import org.junit.Test;
39 import org.junit.runner.RunWith;
40 import org.mockito.InjectMocks;
41 import org.mockito.Mock;
42 import org.mockito.Mockito;
43 import org.mockito.invocation.InvocationOnMock;
44 import org.mockito.junit.MockitoJUnitRunner;
45 import org.mockito.stubbing.Answer;
46 import org.onap.datalake.feeder.config.ApplicationConfiguration;
47 import org.onap.datalake.feeder.domain.*;
48 import org.onap.datalake.feeder.dto.TopicConfig;
49 import org.onap.datalake.feeder.enumeration.DbTypeEnum;
50 import org.onap.datalake.feeder.repository.DbRepository;
51 import org.onap.datalake.feeder.repository.TopicNameRepository;
52 import org.onap.datalake.feeder.repository.TopicRepository;
53 import org.onap.datalake.feeder.service.db.ElasticsearchService;
56 * Test Service for Topic
61 @RunWith(MockitoJUnitRunner.class)
62 public class TopicServiceTest {
64 static String DEFAULT_TOPIC_NAME = "_DL_DEFAULT_";
67 private ApplicationConfiguration config;
70 private TopicRepository topicRepository;
73 private ElasticsearchService elasticsearchService;
76 private DbService dbService;
79 private DbRepository dbRepository;
82 private TopicNameRepository topicNameRepository;
85 private TopicService topicService;
87 @Test(expected = NullPointerException.class)
88 public void testGetTopic() throws IOException{
89 List<Topic> topics = new ArrayList<>();
90 Topic topic = new Topic();
91 DbType dbType = new DbType();
92 Set<Kafka> kafkas = new HashSet<>();
93 Set<Db> dbs = new HashSet<>();
95 db.setName("Elasticsearch");
101 Kafka kafka = new Kafka();
102 kafka.setName("1234");
105 TopicName topicName = new TopicName();
106 topicName.setId("1234");
108 topic.setTopicName(topicName);
109 topic.setKafkas(kafkas);
110 topic.setEnabled(true);
113 when(topicRepository.findAll()).thenReturn(topics);
114 when((ElasticsearchService)dbService.findDbStoreService(db)).thenReturn(new ElasticsearchService(db));
115 topicService.findTopics(kafka,topicName.getId());
116 topicService.getEnabledEffectiveTopic(kafka,topicName.getId(),true);
120 public void testGetTopicNull() {
121 Topic topic = new Topic();
122 TopicName topicName = new TopicName();
123 topicName.setId("_DL_DEFAULT_");
125 topic.setTopicName(topicName);
126 Optional<Topic> optional = Optional.of(topic);
127 when(topicRepository.findById(0)).thenReturn(optional);
128 when(config.getDefaultTopicName()).thenReturn("_DL_DEFAULT_");
129 assertEquals(topic,topicService.getTopic(0));
130 assertTrue(topicService.isDefaultTopic(topic));
134 public void testFillTopic(){
135 TopicConfig tConfig = new TopicConfig();
137 tConfig.setName("1234");
138 tConfig.setLogin("1234");
139 tConfig.setPassword("1234");
140 tConfig.setEnabled(true);
141 tConfig.setSaveRaw(true);
142 tConfig.setDataFormat("1234");
143 tConfig.setTtl(1234);
144 tConfig.setCorrelateClearedMessage(true);
145 tConfig.setMessageIdPath("1234");
146 tConfig.setAggregateArrayPath("1234");
147 tConfig.setFlattenArrayPath("1234");
148 List<Integer> sinkdbs = new ArrayList<>();
150 tConfig.setSinkdbs(sinkdbs);
155 TopicName topicName = new TopicName();
156 topicName.setId("1234");
158 Optional<TopicName> optional = Optional.of(topicName);
159 when(dbRepository.findById(1234)).thenReturn(Optional.of(db));
160 when(topicNameRepository.findById(tConfig.getName())).thenReturn(optional);
162 topicService.fillTopicConfiguration(tConfig);
167 public void testGetEffectiveTopic() throws IOException {
169 Topic topic = new Topic(name);
170 topic.setEnabled(true);
171 Set<Db> dbSet = new HashSet<>();
172 dbSet.add(new Db("Elasticsearch"));
175 when(config.getDefaultTopicName()).thenReturn(DEFAULT_TOPIC_NAME);
176 when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(topic));
177 when(topicRepository.findById(name)).thenReturn(Optional.of(topic));
178 when(topicRepository.findById(null)).thenReturn(Optional.empty());
180 assertEquals(topicService.getEffectiveTopic(name), topicService.getEffectiveTopic(name, false));
182 assertNotNull(topicService.getEffectiveTopic(null));
184 topicService.getEffectiveTopic(name, true);