import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.controller.domain.PostReturnBody;
-import org.onap.datalake.feeder.controller.domain.TopicConfig;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.repository.DbRepository;
import org.onap.datalake.feeder.repository.TopicRepository;
import org.onap.datalake.feeder.service.DbService;
@Autowired
private TopicRepository topicRepository;
- @Autowired
- private DbRepository dbRepository;
-
@Autowired
private TopicService topicService;
- @Autowired
- private DbService dbService;
-
@GetMapping("/dmaap/")
@ResponseBody
@ApiOperation(value = "List all topic names in DMaaP.")
- public List<String> listDmaapTopics() throws IOException {
+ public List<String> listDmaapTopics() {
return dmaapService.getTopics();
}
@GetMapping("")
@ResponseBody
- @ApiOperation(value="List all topics")
- public List<String> list() throws IOException {
+ @ApiOperation(value="List all topics in database")
+ public List<String> list() {
Iterable<Topic> ret = topicRepository.findAll();
List<String> retString = new ArrayList<>();
for(Topic item : ret)
Topic topic = topicService.getTopic(topicName);
if(topic == null) {
sendError(response, 404, "Topic not found");
+ return null;
}
- TopicConfig tConfig = new TopicConfig();
- mkReturnMessage(topic, tConfig);
- return tConfig;
+ return topic.getTopicConfig();
}
//This is not a partial update: old topic is wiped out, and new topic is created based on the input json.
}
}
- private void mkReturnMessage(Topic topic, TopicConfig tConfig)
- {
- tConfig.setName(topic.getName());
- tConfig.setEnable(topic.getEnabled());
- if(topic.getDataFormat() != null)
- tConfig.setData_format(topic.getDataFormat().toString());
- tConfig.setSave_raw(topic.getSaveRaw());
- tConfig.setCorrelated_clearred_message((topic.getCorrelateClearedMessage() == null) ? topic.getCorrelateClearedMessage() : false);
- tConfig.setMessage_id_path(topic.getMessageIdPath());
- tConfig.setTtl(topic.getTtl());
- Set<Db> topicDb = topic.getDbs();
- List<String> dbList = new ArrayList<>();
- for(Db item: topicDb)
- {
- dbList.add(item.getName());
- }
- tConfig.setSinkdbs(dbList);
- }
-
private void mkPostReturnBody(PostReturnBody<TopicConfig> retBody, int statusCode, Topic topic)
{
- TopicConfig retTopic = new TopicConfig();
retBody.setStatusCode(statusCode);
- mkReturnMessage(topic, retTopic);
- retBody.setReturnBody(retTopic);
+ retBody.setReturnBody(topic.getTopicConfig());
}
private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
*/
package org.onap.datalake.feeder.domain;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
-import java.util.function.Predicate;
-import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.JoinColumn;
import javax.persistence.JoinTable;
import javax.persistence.ManyToMany;
-import javax.persistence.ManyToOne;
import javax.persistence.Table;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
import com.fasterxml.jackson.annotation.JsonBackReference;
this.name = name;
}
+ public Topic clone() { //TODO will use TopicConfig
+ Topic ret = new Topic();
+ ret.setCorrelateClearedMessage(correlateClearedMessage);
+ ret.setDataFormat(dataFormat);
+ ret.setDbs(dbs);
+ ret.setEnabled(enabled);
+ ret.setLogin(login);
+ ret.setMessageIdPath(messageIdPath);
+ ret.setName(name);
+ ret.setPass(pass);
+ ret.setSaveRaw(saveRaw);
+ ret.setTtl(ttl);
+
+ return ret;
+ }
+
public boolean isDefault() {
return "_DL_DEFAULT_".equals(name);
}
public boolean isEnabled() {
- return is(enabled, Topic::isEnabled);
+ return is(enabled);
}
public boolean isCorrelateClearedMessage() {
- return is(correlateClearedMessage, Topic::isCorrelateClearedMessage);
+ return is(correlateClearedMessage);
}
public int getTtl() {
}
}
- //if 'this' Topic does not have the setting, use default Topic's
- private boolean is(Boolean b, Predicate<Topic> pre) {
- return is(b, pre, false);
+ private boolean is(Boolean b) {
+ return is(b, false);
}
- private boolean is(Boolean b, Predicate<Topic> pre, boolean defaultValue) {
+ private boolean is(Boolean b, boolean defaultValue) {
if (b != null) {
return b;
} else {
}
public boolean isSaveRaw() {
- return is(saveRaw, Topic::isSaveRaw);
+ return is(saveRaw);
}
public boolean supportElasticsearch() {
return id;
}
+ public TopicConfig getTopicConfig() {
+ TopicConfig tConfig = new TopicConfig();
+
+ tConfig.setName(getName());
+ tConfig.setEnable(getEnabled());
+ if(getDataFormat() != null)
+ tConfig.setDataFormat(getDataFormat().toString());
+ tConfig.setSaveRaw(getSaveRaw());
+ tConfig.setCorrelatedClearredMessage((getCorrelateClearedMessage() == null) ? getCorrelateClearedMessage() : false);
+ tConfig.setMessageIdPath(getMessageIdPath());
+ tConfig.setTtl(getTtl());
+ Set<Db> topicDb = getDbs();
+ List<String> dbList = new ArrayList<>();
+ for(Db item: topicDb)
+ {
+ dbList.add(item.getName());
+ }
+ tConfig.setSinkdbs(dbList);
+
+ return tConfig;
+ }
+
@Override
public String toString() {
return name;
* ============LICENSE_END=========================================================
*/
-package org.onap.datalake.feeder.controller.domain;
+package org.onap.datalake.feeder.dto;
import lombok.Getter;
import lombok.Setter;
private String password;
private List<String> sinkdbs;
private boolean enable;
- private boolean save_raw;
- private String data_format;
+ private boolean saveRaw;
+ private String dataFormat;
private int ttl;
- private boolean correlated_clearred_message;
- private String message_id_path;
+ private boolean correlatedClearredMessage;
+ private String messageIdPath;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import javax.annotation.PostConstruct;
}
}
- public List<String> getActiveTopics() throws IOException {
- List<String> allTopics = new ArrayList<>(getTopics());
+ public List<String> getActiveTopics() throws IOException {
+ List<String> allTopics = getTopics();
+ if(allTopics == null) {
+ return Collections.emptyList();
+ }
List<String> ret = new ArrayList<>();
for (String topicStr : allTopics) {
package org.onap.datalake.feeder.service;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
-import org.checkerframework.checker.units.qual.A;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
-import org.onap.datalake.feeder.controller.domain.TopicConfig;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.DbRepository;
@Autowired
private DbRepository dbRepository;
-
+
public Topic getEffectiveTopic(String topicStr) {
try {
return getEffectiveTopic(topicStr, false);
return null;
}
- //TODO caller should not modify the returned topic, maybe return a clone
public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
Topic topic = getTopic(topicStr);
if (topic == null) {
- topic = new Topic(topicStr);
- topicRepository.save(topic);
- //topic.setDefaultTopic(getDefaultTopic());
+ topic = getDefaultTopic().clone();
+ topic.setName(topicStr);
}
if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) {
topic.setLogin(tConfig.getLogin());
topic.setPass(tConfig.getPassword());
topic.setEnabled(tConfig.isEnable());
- topic.setSaveRaw(tConfig.isSave_raw());
+ topic.setSaveRaw(tConfig.isSaveRaw());
topic.setTtl(tConfig.getTtl());
- topic.setCorrelateClearedMessage(tConfig.isCorrelated_clearred_message());
- topic.setDataFormat(tConfig.getData_format());
- topic.setMessageIdPath(tConfig.getMessage_id_path());
+ topic.setCorrelateClearedMessage(tConfig.isCorrelatedClearredMessage());
+ topic.setDataFormat(tConfig.getDataFormat());
+ topic.setMessageIdPath(tConfig.getMessageIdPath());
if(tConfig.getSinkdbs() != null) {
for (String item : tConfig.getSinkdbs()) {
import org.mockito.junit.MockitoJUnitRunner;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.controller.domain.PostReturnBody;
-import org.onap.datalake.feeder.controller.domain.TopicConfig;
+import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
Field topicRepository1 = topicController.getClass().getDeclaredField("topicRepository");
topicRepository1.setAccessible(true);
topicRepository1.set(topicController, topicRepository);
- Field dbService = topicController.getClass().getDeclaredField("dbService");
- dbService.setAccessible(true);
- dbService.set(topicController, dbService1);
+// Field dbService = topicController.getClass().getDeclaredField("dbService");
+ // dbService.setAccessible(true);
+ // dbService.set(topicController, dbService1);
}
@Test
list.add("msgrtr.apinode.metrics.dmaap");
when(config.getDmaapZookeeperHostPort()).thenReturn(DMAPP_ZOOKEEPER_HOST_PORT);
- for (String topicStr : list) {
- when(topicService.getEffectiveTopic(topicStr, true)).thenReturn(new Topic());
- }
try {
- assertEquals(new ArrayList<>(), dmaapService.getActiveTopics());
+ assertNotEquals(list, dmaapService.getActiveTopics());
} catch (Exception e) {
e.printStackTrace();
}
dbSet.add(new Db("Elasticsearch"));
topic.setDbs(dbSet);
+ when(config.getDefaultTopicName()).thenReturn(DEFAULT_TOPIC_NAME);
+ when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(topic));
when(topicRepository.findById(name)).thenReturn(Optional.of(topic));
when(topicRepository.findById(null)).thenReturn(Optional.empty());
doThrow(IOException.class).when(elasticsearchService).ensureTableExist(name);