Move hard-coded strings to app configuration 01/86101/1
authorGuobiao Mo <guobiaomo@chinamobile.com>
Tue, 23 Apr 2019 22:46:15 +0000 (15:46 -0700)
committerGuobiao Mo <guobiaomo@chinamobile.com>
Tue, 23 Apr 2019 22:46:15 +0000 (15:46 -0700)
Issue-ID: DCAEGEN2-1452
Change-Id: I6577ba3b5a8f877bbaa13d0c590d6a2072331136
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
15 files changed:
components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
components/datalake-handler/feeder/src/main/resources/application.properties
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
components/datalake-handler/feeder/src/test/resources/application.properties

index a349d14..d674ead 100644 (file)
@@ -43,9 +43,9 @@ CREATE TABLE `map_db_topic` (
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
 \r
 \r
-insert into db (`name`,`host`,`login`,`pass`,`database`) values ('Couchbase','dl_couchbase','dl','dl1234','datalake');\r
+insert into db (`name`,`host`,`login`,`pass`,`database_name`) values ('Couchbase','dl_couchbase','dl','dl1234','datalake');\r
 insert into db (`name`,`host`) values ('Elasticsearch','dl_es');\r
-insert into db (`name`,`host`,`port`,`database`) values ('MongoDB','dl_mongodb',27017,'datalake');\r
+insert into db (`name`,`host`,`port`,`database_name`) values ('MongoDB','dl_mongodb',27017,'datalake');\r
 insert into db (`name`,`host`) values ('Druid','dl_druid');\r
 \r
 \r
index 6bacf13..21ca1b7 100644 (file)
@@ -53,4 +53,8 @@ public class ApplicationConfiguration {
        private boolean async;
        private boolean enableSSL;
 
+       private String timestampLabel;
+       private String rawDataLabel;
+       
+       private String defaultTopicName;
 }
index 4bfe7aa..2b724c7 100644 (file)
@@ -192,6 +192,7 @@ public class DbController {
                Db delDb = dbRepository.findByName(dbName);
                if (delDb == null) {
                        sendError(response, 404, "Db not found: " + dbName);
+                       return;
                }
                Set<Topic> topicRelation = delDb.getTopics();
                topicRelation.clear();
index 747a72c..0869fde 100644 (file)
@@ -49,18 +49,19 @@ import org.springframework.web.bind.annotation.RestController;
 import io.swagger.annotations.ApiOperation;
 
 /**
- * This controller manages topic settings. 
+ * This controller manages topic settings.
  * 
- * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic. 
- * All the settings are saved in database. 
- * topic "_DL_DEFAULT_" is populated at setup by a DB script.
+ * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's
+ * enabled=null, _DL_DEFAULT_.enabled is used for that topic. All the settings
+ * are saved in database. topic "_DL_DEFAULT_" is populated at setup by a DB
+ * script.
  * 
  * @author Guobiao Mo
  *
  */
 
 @RestController
-@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
+@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE }) //, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
 public class TopicController {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -70,23 +71,23 @@ public class TopicController {
 
        @Autowired
        private TopicRepository topicRepository;
-       
+
        @Autowired
        private TopicService topicService;
 
        @Autowired
        private DbService dbService;
-       
+
        @GetMapping("/dmaap/")
        @ResponseBody
-       @ApiOperation(value="List all topic names in DMaaP.")
+       @ApiOperation(value = "List all topic names in DMaaP.")
        public List<String> listDmaapTopics() throws IOException {
                return dmaapService.getTopics();
        }
 
        @GetMapping("/")
        @ResponseBody
-       @ApiOperation(value="List all topics' settings.")
+       @ApiOperation(value = "List all topics' settings.")
        public Iterable<Topic> list() throws IOException {
                Iterable<Topic> ret = topicRepository.findAll();
                return ret;
@@ -94,7 +95,7 @@ public class TopicController {
 
        @GetMapping("/{topicName}")
        @ResponseBody
-       @ApiOperation(value="Get a topic's settings.")
+       @ApiOperation(value = "Get a topic's settings.")
        public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException {
                Topic topic = topicService.getTopic(topicName);
                return topic;
@@ -102,7 +103,7 @@ public class TopicController {
 
        @GetMapping("/{topicName}/dbs")
        @ResponseBody
-       @ApiOperation(value="Get all DBs in a topic.")
+       @ApiOperation(value = "Get all DBs in a topic.")
        public Set<Db> getTopicDbs(@PathVariable("topicName") String topicName) throws IOException {
                Topic topic = topicService.getTopic(topicName);
                Set<Db> dbs = topic.getDbs();
@@ -113,50 +114,50 @@ public class TopicController {
        //One exception is that old DBs are kept
        @PutMapping("/")
        @ResponseBody
-       @ApiOperation(value="Update a topic.")
+       @ApiOperation(value = "Update a topic.")
        public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
 
                if (result.hasErrors()) {
-                       sendError(response, 400, "Error parsing Topic: "+result.toString());
-                       return null; 
+                       sendError(response, 400, "Error parsing Topic: " + result.toString());
+                       return null;
                }
 
                Topic oldTopic = getTopic(topic.getName());
                if (oldTopic == null) {
-                       sendError(response, 404, "Topic not found "+topic.getName());
-                       return null; 
+                       sendError(response, 404, "Topic not found " + topic.getName());
+                       return null;
                } else {
-                       if(!topic.isDefault()) {
+                       if (!topicService.istDefaultTopic(topic)) {
                                Topic defaultTopic = topicService.getDefaultTopic();
                                topic.setDefaultTopic(defaultTopic);
                        }
-                       
+
                        topic.setDbs(oldTopic.getDbs());
                        topicRepository.save(topic);
                        return topic;
                }
        }
+
        @PostMapping("/")
        @ResponseBody
-       @ApiOperation(value="Create a new topic.")
+       @ApiOperation(value = "Create a new topic.")
        public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
-               
+
                if (result.hasErrors()) {
-                       sendError(response, 400, "Error parsing Topic: "+result.toString());
+                       sendError(response, 400, "Error parsing Topic: " + result.toString());
                        return null;
                }
 
                Topic oldTopic = getTopic(topic.getName());
                if (oldTopic != null) {
-                       sendError(response, 400, "Topic already exists "+topic.getName());
+                       sendError(response, 400, "Topic already exists " + topic.getName());
                        return null;
                } else {
-                       if(!topic.isDefault()) {
+                       if (!topicService.istDefaultTopic(topic)) {
                                Topic defaultTopic = topicService.getDefaultTopic();
                                topic.setDefaultTopic(defaultTopic);
                        }
-                       
+
                        topicRepository.save(topic);
                        return topic;
                }
@@ -164,32 +165,32 @@ public class TopicController {
 
        @DeleteMapping("/{topicName}/db/{dbName}")
        @ResponseBody
-       @ApiOperation(value="Delete a DB from a topic.")
+       @ApiOperation(value = "Delete a DB from a topic.")
        public Set<Db> deleteDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
                Topic topic = topicService.getTopic(topicName);
                Set<Db> dbs = topic.getDbs();
                dbs.remove(new Db(dbName));
-                
+
                topicRepository.save(topic);
-               return topic.getDbs();           
+               return topic.getDbs();
        }
 
        @PutMapping("/{topicName}/db/{dbName}")
        @ResponseBody
-       @ApiOperation(value="Add a DB to a topic.")
+       @ApiOperation(value = "Add a DB to a topic.")
        public Set<Db> addDb(@PathVariable("topicName") String topicName, @PathVariable("dbName") String dbName, HttpServletResponse response) throws IOException {
                Topic topic = topicService.getTopic(topicName);
-               Set<Db> dbs = topic.getDbs();           
+               Set<Db> dbs = topic.getDbs();
 
-               Db db = dbService.getDb(dbName);                
+               Db db = dbService.getDb(dbName);
                dbs.add(db);
-                
+
                topicRepository.save(topic);
-               return topic.getDbs();           
+               return topic.getDbs();
        }
-       
+
        private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
                log.info(msg);
-               response.sendError(sc, msg);            
+               response.sendError(sc, msg);
        }
 }
index b7f89fc..2b92e86 100644 (file)
@@ -111,10 +111,6 @@ public class Topic {
                this.name = name;
        }
 
-       public boolean isDefault() {
-               return "_DL_DEFAULT_".equals(name);
-       }
-
        public boolean isEnabled() {
                return is(enabled, Topic::isEnabled);
        }
index a717e10..1e5fb78 100644 (file)
@@ -27,6 +27,7 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
  
 import org.json.JSONObject;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.domain.Topic;
 import org.slf4j.Logger;
@@ -56,6 +57,9 @@ public class CouchbaseService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
 
+       @Autowired
+       ApplicationConfiguration config;
+
        @Autowired
        private DbService dbService;
        
@@ -73,12 +77,12 @@ public class CouchbaseService {
             log.info("Connect to Couchbase {}", couchbase.getHost());
             // Create a N1QL Primary Index (but ignore if it exists)
             bucket.bucketManager().createN1qlPrimaryIndex(true, false);
+            isReady = true;
         }
         catch(Exception        ex)
         {
             isReady = false;
         }
-        isReady = true;
        }
 
        @PreDestroy
@@ -92,7 +96,7 @@ public class CouchbaseService {
                        //convert to Couchbase JsonObject from org.json JSONObject
                        JsonObject jsonObject = JsonObject.fromJson(json.toString());   
 
-                       long timestamp = jsonObject.getLong("_ts");//this is Kafka time stamp, which is added in StoreService.messageToJson()
+                       long timestamp = jsonObject.getLong(config.getTimestampLabel());//this is Kafka time stamp, which is added in StoreService.messageToJson()
 
                        //setup TTL
                        int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
@@ -102,6 +106,7 @@ public class CouchbaseService {
                        documents.add(doc);
                }
                saveDocuments(documents);               
+               log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); 
        }
 
        public String getId(Topic topic, JSONObject json) {
index 3c75eb6..e9f36b2 100644 (file)
@@ -125,7 +125,7 @@ public class PullThread implements Runnable {
                                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                                        for (ConsumerRecord<String, String> record : partitionRecords) {
                                                messages.add(Pair.of(record.timestamp(), record.value()));
-                                               log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+                                               //log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
                                        }
                                        storeService.saveMessages(partition.topic(), messages);
                                        log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
index d9fe12a..a4f7910 100644 (file)
@@ -24,7 +24,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map; 
+import java.util.Map;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.json.JSONException;
 import org.json.JSONObject;
 import org.json.XML;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Topic;
 import org.onap.datalake.feeder.enumeration.DataFormat;
 import org.slf4j.Logger;
@@ -41,16 +42,17 @@ import org.slf4j.LoggerFactory;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; 
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 
 /**
  * Service to store messages to varieties of DBs
  * 
- * comment out YAML support, since AML is for config and don't see this data type in DMaaP. Do we need to support XML?
+ * comment out YAML support, since AML is for config and don't see this data
+ * type in DMaaP. Do we need to support XML?
  * 
  * @author Guobiao Mo
  *
@@ -59,6 +61,9 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 public class StoreService {
        private final Logger log = LoggerFactory.getLogger(this.getClass());
 
+       @Autowired
+       private ApplicationConfiguration config;
+
        @Autowired
        private TopicService topicService;
 
@@ -71,9 +76,9 @@ public class StoreService {
        @Autowired
        private ElasticsearchService elasticsearchService;
 
-       private Map<String, Topic> topicMap = new HashMap<>(); 
+       private Map<String, Topic> topicMap = new HashMap<>();
 
-       private ObjectMapper yamlReader;        
+       private ObjectMapper yamlReader;
 
        @PostConstruct
        private void init() {
@@ -112,14 +117,14 @@ public class StoreService {
                String text = pair.getRight();
 
                //for debug, to be remove
-//             String topicStr = topic.getId();
-//             if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
-       //              log.debug("{} ={}", topicStr, text);
+               //              String topicStr = topic.getId();
+               //              if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
+               //              log.debug("{} ={}", topicStr, text);
                //}
 
                boolean storeRaw = topic.isSaveRaw();
 
-               JSONObject json = null;         
+               JSONObject json = null;
 
                DataFormat dataFormat = topic.getDataFormat();
 
@@ -129,7 +134,7 @@ public class StoreService {
                        break;
                case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON 
                        json = XML.toJSONObject(text);
-                       break;          
+                       break;
                case YAML:// Do we need to support YAML?
                        Object obj = yamlReader.readValue(text, Object.class);
                        ObjectMapper jsonWriter = new ObjectMapper();
@@ -145,10 +150,11 @@ public class StoreService {
                //FIXME for debug, to be remove
                json.remove("_id");
                json.remove("_dl_text_");
+               json.remove("_dl_type_");
 
-               json.put("_ts", timestamp);
+               json.put(config.getTimestampLabel(), timestamp);
                if (storeRaw) {
-                       json.put("_text", text);
+                       json.put(config.getRawDataLabel(), text);
                }
 
                return json;
index 7e0c778..ea1eb4c 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.datalake.feeder.service;
 import java.io.IOException;
 import java.util.Optional;
 
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Topic;
 import org.onap.datalake.feeder.repository.TopicRepository;
 import org.slf4j.Logger;
@@ -41,6 +42,9 @@ public class TopicService {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
 
+       @Autowired
+       private ApplicationConfiguration config;
+       
        @Autowired
        private TopicRepository topicRepository;
 
@@ -76,7 +80,13 @@ public class TopicService {
        }
 
        public Topic getDefaultTopic() {
-               return getTopic("_DL_DEFAULT_");
+               return getTopic(config.getDefaultTopicName());
        }
 
+       public boolean istDefaultTopic(Topic topic) {
+               if (topic == null) {
+                       return false;
+               }
+               return topic.getName().equals(config.getDefaultTopicName());
+       }
 }
index 842d01b..1bd5339 100644 (file)
@@ -38,6 +38,13 @@ async=true
 #SSL global flag, if enabled, still need to check each individual DB SSL flag
 enableSSL=false
 
+#names for extra fields that DL adds to each record
+timestampLabel=datalake_ts_
+rawDataLabel=datalake_text_
+
+defaultTopicName=_DL_DEFAULT_
+
+       
 #Logging
 logging.level.org.springframework.web=ERROR
 logging.level.com.att.nsa.apiClient.http=ERROR
index ee3ceb7..3e7d998 100644 (file)
@@ -61,6 +61,9 @@ public class ApplicationConfigurationTest {
         assertTrue(config.getKafkaConsumerCount() > 0);
         assertNotNull(config.isAsync());
         assertNotNull(config.isEnableSSL());
+        assertNotNull(config.getDefaultTopicName());
+        assertNotNull(config.getRawDataLabel());
+        assertNotNull(config.getTimestampLabel());
     }
 
 }
index afd4503..8be45d6 100644 (file)
@@ -98,9 +98,6 @@ public class TopicTest {
         Topic testTopic = new Topic("test");
         testTopic.setDefaultTopic(defaultTopic);
 
-        assertTrue(defaultTopic.isDefault());
-        assertFalse(testTopic.isDefault());
-
         assertTrue(testTopic.equals(new Topic("test")));
         assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
 
index 9e40a2b..9e1b2d9 100755 (executable)
@@ -34,6 +34,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Topic;
 
 import java.util.ArrayList;
@@ -98,6 +99,8 @@ public class CouchbaseServiceTest {
 
     @Test
     public void testSaveJsonsWithTopicId() {
+       ApplicationConfiguration appConfig = new ApplicationConfiguration();
+       appConfig.setTimestampLabel("datalake_ts_");
 
         String text = "{ data: { data2 : { value : 'hello'}}}";
 
@@ -106,16 +109,19 @@ public class CouchbaseServiceTest {
         Topic topic = new Topic("test getMessageId");
         topic.setMessageIdPath("/data/data2/value");
         List<JSONObject> jsons = new ArrayList<>();
-        json.put("_ts", 1234);
+        json.put(appConfig.getTimestampLabel(), 1234);
         jsons.add(json);
         CouchbaseService couchbaseService = new CouchbaseService();
         couchbaseService.bucket = bucket;
+        couchbaseService.config = appConfig;
         couchbaseService.saveJsons(topic, jsons);
 
     }
 
     @Test
     public void testSaveJsonsWithOutTopicId() {
+       ApplicationConfiguration appConfig = new ApplicationConfiguration();
+       appConfig.setTimestampLabel("datalake_ts_");
 
         String text = "{ data: { data2 : { value : 'hello'}}}";
 
@@ -123,10 +129,11 @@ public class CouchbaseServiceTest {
 
         Topic topic = new Topic("test getMessageId");
         List<JSONObject> jsons = new ArrayList<>();
-        json.put("_ts", 1234);
+        json.put(appConfig.getTimestampLabel(), 1234);
         jsons.add(json);
         CouchbaseService couchbaseService = new CouchbaseService();
         couchbaseService.bucket = bucket;
+        couchbaseService.config = appConfig;
         couchbaseService.saveJsons(topic, jsons);
     }
 
index 0e6db83..99f2239 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.datalake.feeder.service;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.when;
 
@@ -36,6 +37,7 @@ import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.domain.Topic;
 import org.onap.datalake.feeder.repository.TopicRepository;
@@ -49,6 +51,11 @@ import org.onap.datalake.feeder.repository.TopicRepository;
 @RunWith(MockitoJUnitRunner.class)
 public class TopicServiceTest {
 
+       static String DEFAULT_TOPIC_NAME = "_DL_DEFAULT_";
+
+       @Mock
+       private ApplicationConfiguration config;
+       
        @Mock
        private TopicRepository topicRepository;
 
@@ -74,9 +81,10 @@ public class TopicServiceTest {
 
        @Test
        public void testGetDefaultTopic() {
-               String name = "_DL_DEFAULT_";
-               when(topicRepository.findById(name)).thenReturn(Optional.of(new Topic(name)));
-               assertEquals(topicService.getDefaultTopic(), new Topic(name));
+               when(topicRepository.findById(DEFAULT_TOPIC_NAME)).thenReturn(Optional.of(new Topic(DEFAULT_TOPIC_NAME)));
+               when(config.getDefaultTopicName()).thenReturn(DEFAULT_TOPIC_NAME);
+               assertEquals(topicService.getDefaultTopic(), new Topic(DEFAULT_TOPIC_NAME));
+               assertTrue(topicService.istDefaultTopic(new Topic(DEFAULT_TOPIC_NAME)));
        }
 
        @Test(expected = IOException.class)
index d6d98e6..b907705 100644 (file)
@@ -17,6 +17,16 @@ kafkaConsumerCount=1
 #tolerate inconsistency when system crash, see PullThread.run()
 async=true
 
+#SSL global flag, if enabled, still need to check each individual DB SSL flag
+enableSSL=false
+
+#names for extra fields that DL adds to each record
+timestampLabel=datalake_ts_
+rawDataLabel=datalake_text_
+
+defaultTopicName=_DL_DEFAULT_
+
+       
 #Logging
 logging.level.org.springframework.web=ERROR
 logging.level.com.att.nsa.apiClient.http=ERROR