tConfig.setAggregateArrayPath(getAggregateArrayPath());
tConfig.setFlattenArrayPath(getFlattenArrayPath());
tConfig.setTtl(getTtl());
+
Set<Db> topicDb = getDbs();
List<String> dbList = new ArrayList<>();
+ List<String> enabledDbList = new ArrayList<>();
if (topicDb != null) {
for (Db item : topicDb) {
dbList.add(item.getName());
+ if(item.isEnabled()) {
+ enabledDbList.add(item.getName());
+ }
}
}
tConfig.setSinkdbs(dbList);
+ tConfig.setEnabledSinkdbs(enabledDbList);
return tConfig;
}
private String login;
private String password;
private List<String> sinkdbs;
+ private List<String> enabledSinkdbs;//only include enabled db
private boolean enabled;
private boolean saveRaw;
private String dataFormat;
}
public boolean supportHdfs() {
- return containDb("HDFS");
+ return supportDb("HDFS");
}
public boolean supportElasticsearch() {
- return containDb("Elasticsearch");//TODO string hard codes
+ return supportDb("Elasticsearch");//TODO string hard codes
}
public boolean supportCouchbase() {
- return containDb("Couchbase");
+ return supportDb("Couchbase");
}
public boolean supportDruid() {
- return containDb("Druid");
+ return supportDb("Druid");
}
public boolean supportMongoDB() {
- return containDb("MongoDB");
+ return supportDb("MongoDB");
}
- private boolean containDb(String dbName) {
- return (sinkdbs != null && sinkdbs.contains(dbName));
+ private boolean supportDb(String dbName) {
+ return (enabledSinkdbs != null && enabledSinkdbs.contains(dbName));
}
//extract DB id from JSON attributes, support multiple attributes
@Override
public String toString() {
- return name;
+ return String.format("Topic %s(enabled=%s, enabledSinkdbs=%s)", name, enabled, enabledSinkdbs);
}
@Override
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
assertNotNull(config.getDmaapKafkaGroup());
assertTrue(config.getDmaapKafkaTimeout() > 0L);
assertTrue(config.getDmaapCheckNewTopicInterval() > 0);
+
+ assertNull(config.getDmaapKafkaLogin());
+ assertNull(config.getDmaapKafkaPass());
+ assertNull(config.getDmaapKafkaSecurityProtocol());
assertTrue(config.getKafkaConsumerCount() > 0);
TopicConfig testTopicConfig = testTopic.getTopicConfig();
testTopicConfig.setSinkdbs(null);
+ testTopicConfig.setEnabledSinkdbs(null);
assertFalse(testTopicConfig.supportElasticsearch());
assertNull(testTopicConfig.getDataFormat2());
testTopic.setDbs(new HashSet<>());
- testTopic.getDbs().add(new Db("Elasticsearch"));
+ Db esDb = new Db("Elasticsearch");
+ esDb.setEnabled(true);
+ testTopic.getDbs().add(esDb);
testTopicConfig = testTopic.getTopicConfig();
package org.onap.datalake.feeder.service;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import java.lang.reflect.InvocationTargetException;
topicConfig.getSinkdbs().add("MongoDB");
topicConfig.getSinkdbs().add("HDFS");
+
+ topicConfig.setEnabledSinkdbs(new ArrayList<>());
+ topicConfig.getEnabledSinkdbs().add("Elasticsearch");
+ assertTrue(topicConfig.supportElasticsearch());
+
+
createTopicConfig("test4", "TEXT");
when(config.getTimestampLabel()).thenReturn("ts");