Check Db enabled flag 19/90319/1
authorGuobiao Mo <guobiaomo@chinamobile.com>
Fri, 21 Jun 2019 16:34:39 +0000 (09:34 -0700)
committerGuobiao Mo <guobiaomo@chinamobile.com>
Fri, 21 Jun 2019 16:34:39 +0000 (09:34 -0700)
If DB is disabled, don't send data

Issue-ID: DCAEGEN2-1622

Change-Id: I24482259125853d28a9abed2b6e987c169411411
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java

index acb48ae..c171c56 100644 (file)
@@ -161,14 +161,20 @@ public class Topic {
                tConfig.setAggregateArrayPath(getAggregateArrayPath());
                tConfig.setFlattenArrayPath(getFlattenArrayPath());
                tConfig.setTtl(getTtl());
+               
                Set<Db> topicDb = getDbs();
                List<String> dbList = new ArrayList<>();
+               List<String> enabledDbList = new ArrayList<>();
                if (topicDb != null) {
                        for (Db item : topicDb) {
                                dbList.add(item.getName());
+                               if(item.isEnabled()) {
+                                       enabledDbList.add(item.getName());
+                               }
                        }
                }
                tConfig.setSinkdbs(dbList);
+               tConfig.setEnabledSinkdbs(enabledDbList);
 
                return tConfig;
        }
index 8dfe1b1..70778bb 100644 (file)
@@ -45,6 +45,7 @@ public class TopicConfig {
        private String login;
        private String password;
        private List<String> sinkdbs;
+       private List<String> enabledSinkdbs;//only include enabled db
        private boolean enabled;
        private boolean saveRaw;
        private String dataFormat;
@@ -63,27 +64,27 @@ public class TopicConfig {
        }
 
        public boolean supportHdfs() {
-               return containDb("HDFS");
+               return supportDb("HDFS");
        }
 
        public boolean supportElasticsearch() {
-               return containDb("Elasticsearch");//TODO string hard codes
+               return supportDb("Elasticsearch");//TODO string hard codes
        }
 
        public boolean supportCouchbase() {
-               return containDb("Couchbase");
+               return supportDb("Couchbase");
        }
 
        public boolean supportDruid() {
-               return containDb("Druid");
+               return supportDb("Druid");
        }
 
        public boolean supportMongoDB() {
-               return containDb("MongoDB");
+               return supportDb("MongoDB");
        }
 
-       private boolean containDb(String dbName) {
-               return (sinkdbs != null && sinkdbs.contains(dbName));
+       private boolean supportDb(String dbName) {
+               return (enabledSinkdbs != null && enabledSinkdbs.contains(dbName));
        }
 
        //extract DB id from JSON attributes, support multiple attributes
@@ -128,7 +129,7 @@ public class TopicConfig {
 
        @Override
        public String toString() {
-               return name;
+               return String.format("Topic %s(enabled=%s, enabledSinkdbs=%s)", name, enabled, enabledSinkdbs);
        }
 
        @Override
index 0c56d5a..9ac4342 100644 (file)
@@ -27,6 +27,7 @@ import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -57,6 +58,10 @@ public class ApplicationConfigurationTest {
         assertNotNull(config.getDmaapKafkaGroup());
         assertTrue(config.getDmaapKafkaTimeout() > 0L);
         assertTrue(config.getDmaapCheckNewTopicInterval() > 0);
+        
+        assertNull(config.getDmaapKafkaLogin());
+        assertNull(config.getDmaapKafkaPass());
+        assertNull(config.getDmaapKafkaSecurityProtocol());
 
         assertTrue(config.getKafkaConsumerCount() > 0);
 
index f52332a..4bc1832 100644 (file)
@@ -99,11 +99,14 @@ public class TopicConfigTest {
 
         TopicConfig testTopicConfig = testTopic.getTopicConfig();
         testTopicConfig.setSinkdbs(null);
+        testTopicConfig.setEnabledSinkdbs(null);
         assertFalse(testTopicConfig.supportElasticsearch());
         assertNull(testTopicConfig.getDataFormat2());
                 
         testTopic.setDbs(new HashSet<>());
-        testTopic.getDbs().add(new Db("Elasticsearch"));
+        Db esDb = new Db("Elasticsearch");
+        esDb.setEnabled(true);
+        testTopic.getDbs().add(esDb);
         
         testTopicConfig = testTopic.getTopicConfig();
 
index fc05d1d..94eeb08 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.datalake.feeder.service;
 
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
 import java.lang.reflect.InvocationTargetException;
@@ -108,6 +109,12 @@ public class StoreServiceTest {
                topicConfig.getSinkdbs().add("MongoDB");
                topicConfig.getSinkdbs().add("HDFS");
 
+
+               topicConfig.setEnabledSinkdbs(new ArrayList<>());
+               topicConfig.getEnabledSinkdbs().add("Elasticsearch");
+               assertTrue(topicConfig.supportElasticsearch());
+               
+               
                createTopicConfig("test4", "TEXT");
 
                when(config.getTimestampLabel()).thenReturn("ts");