Use MariaDB to store application configurations 32/84432/2
authorGuobiao Mo <guobiaomo@chinamobile.com>
Sun, 7 Apr 2019 04:52:09 +0000 (21:52 -0700)
committerGuobiao Mo <guobiaomo@chinamobile.com>
Sun, 7 Apr 2019 08:37:31 +0000 (01:37 -0700)
Issue-ID: DCAEGEN2-1400
Change-Id: I86b5bc25d84b98f7ac84b95f1690089dcebe7f0a
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
32 files changed:
components/datalake-handler/feeder/pom.xml
components/datalake-handler/feeder/src/assembly/docker-compose.yml [deleted file]
components/datalake-handler/feeder/src/assembly/scripts/init_db.sql [new file with mode: 0644]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java [new file with mode: 0644]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java [moved from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java with 88% similarity]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java [new file with mode: 0644]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/DbRepository.java [moved from components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java with 80% similarity]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java [deleted file]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java [new file with mode: 0644]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
components/datalake-handler/feeder/src/main/resources/application.properties
components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-kafka-supervisor.json
components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json [new file with mode: 0644]
components/datalake-handler/feeder/src/main/resources/druid/DCAE_CL_OUTPUT-kafka-supervisor.json
components/datalake-handler/feeder/src/main/resources/druid/SEC_FAULT_OUTPUT-kafka-supervisor.json
components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-kafka-supervisor.json
components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json [new file with mode: 0644]
components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json [new file with mode: 0644]
components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json [new file with mode: 0644]
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java [new file with mode: 0644]
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
components/datalake-handler/feeder/src/test/resources/application.properties
components/datalake-handler/pom.xml

index f88baf3..5b47a24 100644 (file)
                <version>1.0.0-SNAPSHOT</version>
        </parent>
 
-       <groupId>org.onap.datalake</groupId>
+       <groupId>org.onap.dcaegen2.services.components.datalake-handler</groupId>
        <artifactId>feeder</artifactId>
        <packaging>jar</packaging>
        <name>DataLake Feeder</name>
 
 
        <dependencies>
+       
+               <dependency>
+               <groupId>org.mariadb.jdbc</groupId>
+               <artifactId>mariadb-java-client</artifactId>
+               </dependency>
+       
                <dependency>
                        <groupId>org.json</groupId>
                        <artifactId>json</artifactId>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-actuator</artifactId>
                </dependency>
-
+        
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-jpa</artifactId>
+        </dependency>
+        
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-data-couchbase</artifactId>
diff --git a/components/datalake-handler/feeder/src/assembly/docker-compose.yml b/components/datalake-handler/feeder/src/assembly/docker-compose.yml
deleted file mode 100644 (file)
index 7ca466b..0000000
+++ /dev/null
@@ -1,10 +0,0 @@
-version: '2'
-services:
-
-  datalake:
-    image: moguobiao/datalake-storage
-    container_name: datalake-storage
-    environment:    
-      - no-needed-dmaapHost=10.0.2.15:3904
-    ports:
-      - "1680:1680"
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
new file mode 100644 (file)
index 0000000..2185320
--- /dev/null
@@ -0,0 +1,54 @@
+create database datalake;\r
+use datalake;\r
+\r
+CREATE TABLE `topic` (\r
+  `name` varchar(255) NOT NULL,\r
+  `correlate_cleared_message` bit(1) DEFAULT NULL,\r
+  `enabled` bit(1) DEFAULT NULL,\r
+  `login` varchar(255) DEFAULT NULL,\r
+  `message_id_path` varchar(255) DEFAULT NULL,\r
+  `pass` varchar(255) DEFAULT NULL,\r
+  `save_raw` bit(1) DEFAULT NULL,\r
+  `ttl` int(11) DEFAULT NULL,\r
+  `data_format` varchar(255) DEFAULT NULL,\r
+  `default_topic` varchar(255) DEFAULT NULL,\r
+  PRIMARY KEY (`name`),\r
+  KEY `FK_default_topic` (`default_topic`),\r
+  CONSTRAINT `FK_default_topic` FOREIGN KEY (`default_topic`) REFERENCES `topic` (`name`)\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
+\r
+\r
+CREATE TABLE `db` (\r
+  `name` varchar(255) NOT NULL,\r
+  `host` varchar(255) DEFAULT NULL,\r
+  `login` varchar(255) DEFAULT NULL,\r
+  `pass` varchar(255) DEFAULT NULL,\r
+  `property1` varchar(255) DEFAULT NULL,\r
+  `property2` varchar(255) DEFAULT NULL,\r
+  `property3` varchar(255) DEFAULT NULL,\r
+  PRIMARY KEY (`name`)\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
+\r
+\r
+CREATE TABLE `map_db_topic` (\r
+  `db_name` varchar(255) NOT NULL,\r
+  `topic_name` varchar(255) NOT NULL,\r
+  PRIMARY KEY (`db_name`,`topic_name`),\r
+  KEY `FK_topic_name` (`topic_name`),\r
+  CONSTRAINT `FK_topic_name` FOREIGN KEY (`topic_name`) REFERENCES `topic` (`name`),\r
+  CONSTRAINT `FK_db_name` FOREIGN KEY (`db_name`) REFERENCES `db` (`name`)\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
+\r
+\r
+insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap');\r
+insert into db (name,host) values ('Elasticsearch','dl_es');\r
+insert into db (name,host) values ('MongoDB','dl_mongodb');\r
+insert into db (name,host) values ('Druid','dl_druid');\r
+\r
+\r
+-- in production, default enabled should be off\r
+insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');\r
+insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);\r
+\r
+\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_'); \r
index 108eb4e..1136e30 100644 (file)
 
 package org.onap.datalake.feeder.config;
 
-import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringBootConfiguration;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.couchbase.CouchbaseConfiguration;
 import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.FilterType;
-import org.springframework.context.annotation.ComponentScan;
 
 import lombok.Getter;
 import lombok.Setter;
@@ -42,34 +38,18 @@ import lombok.Setter;
 @Setter
 @SpringBootConfiguration
 @ConfigurationProperties
-//@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = CouchbaseConfiguration.class))
-//https://stackoverflow.com/questions/29344313/prevent-application-commandlinerunner-classes-from-executing-during-junit-test
 @EnableAutoConfiguration
-//@Profile("test")
 public class ApplicationConfiguration {
 
-       private String couchbaseHost;
-       private String couchbaseUser;
-       private String couchbasePass;
-       private String couchbaseBucket;
-
-       //    private int mongodbPort;    
-       //  private String mongodbDatabase;    
-
        private String dmaapZookeeperHostPort;
        private String dmaapKafkaHostPort;
        private String dmaapKafkaGroup;
        private long dmaapKafkaTimeout;
 
-//     private boolean dmaapMonitorAllTopics;
        private int dmaapCheckNewTopicIntervalInSec;
-       //private String dmaapHostPort;
-       //private Set<String> dmaapExcludeTopics; 
-       //private Set<String> dmaapIncludeTopics; 
 
        private int kafkaConsumerCount;
 
        private boolean async;
 
-       private String elasticsearchHost;
 }
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/DbController.java
new file mode 100644 (file)
index 0000000..c34befc
--- /dev/null
@@ -0,0 +1,135 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.controller;
+
+import java.io.IOException;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.service.DbService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * This controller manages the big data storage settings. All the settings are saved in database. 
+ * 
+ * @author Guobiao Mo
+ *
+ */
+
+@RestController
+@RequestMapping(value = "/dbs", produces = { MediaType.APPLICATION_JSON_VALUE })
+public class DbController {
+
+       private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+       @Autowired
+       private DbRepository dbRepository;
+       
+       @Autowired
+       private DbService dbService;
+
+       //list all dbs 
+       @GetMapping("/")
+       @ResponseBody
+       public Iterable<Db> list() throws IOException {
+               Iterable<Db> ret = dbRepository.findAll();
+               return ret;
+       }
+
+       //Read a db
+       //the topics are missing in the return, since in we use @JsonBackReference on Db's topics 
+       //need to the the following method to retrieve the topic list
+       @GetMapping("/{name}")
+       @ResponseBody
+       public Db getDb(@PathVariable("name") String dbName) throws IOException {
+               Db db = dbService.getDb(dbName);
+               return db;
+       }
+
+       //Read topics in a DB 
+       @GetMapping("/{name}/topics")
+       @ResponseBody
+       public Set<Topic> getDbTopics(@PathVariable("name") String dbName) throws IOException {
+               Db db = dbService.getDb(dbName);
+               Set<Topic> topics = db.getTopics();
+               return topics;
+       }
+
+       //Update Db
+       @PutMapping("/")
+       @ResponseBody
+       public Db updateDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException {
+
+               if (result.hasErrors()) {
+                       sendError(response, 400, "Error parsing DB: "+result.toString());
+                       return null; 
+               }
+
+               Db oldDb = getDb(db.getName());
+               if (oldDb == null) {
+                       sendError(response, 404, "Db not found: "+db.getName());
+                       return null; 
+               } else {
+                       dbRepository.save(db);                  
+                       return db;
+               }
+       }
+
+       //create a new Db  
+       @PostMapping("/")
+       @ResponseBody
+       public Db createDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException {
+
+               if (result.hasErrors()) {
+                       sendError(response, 400, "Error parsing DB: "+result.toString());
+                       return null;
+               }
+
+               Db oldDb = getDb(db.getName());
+               if (oldDb != null) {
+                       sendError(response, 400, "Db already exists: "+db.getName());
+                       return null;
+               } else {
+                       dbRepository.save(db);                  
+                       return db;
+               }
+       }
+
+       private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+               log.info(msg);
+               response.sendError(sc, msg);            
+       }
+}
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
@@ -37,8 +38,8 @@ import org.springframework.web.bind.annotation.RestController;
  */
 
 @RestController
-@RequestMapping(value = "/pull", produces = { MediaType.TEXT_PLAIN_VALUE })
-public class PullController {
+@RequestMapping(value = "/feeder", produces = { MediaType.TEXT_PLAIN_VALUE })
+public class FeederController {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
        
@@ -49,7 +50,7 @@ public class PullController {
      * @return message that application is started
      * @throws IOException 
      */
-    @RequestMapping("/start")
+    @GetMapping("/start")
     public String start() throws IOException {
        log.info("DataLake feeder starting to pull data from DMaaP...");
        pullService.start();
@@ -59,7 +60,7 @@ public class PullController {
     /**
      * @return message that application stop process is triggered
      */
-    @RequestMapping("/stop")
+    @GetMapping("/stop")
     public String stop() {     
        pullService.shutdown();
        log.info("DataLake feeder is stopped.");
@@ -68,9 +69,9 @@ public class PullController {
     /**
      * @return feeder status
      */
-    @RequestMapping("/status")
+    @GetMapping("/status")
     public String status() {           
-       String status = "to be impletemented";
+       String status = "Feeder is running: "+pullService.isRunning();
        log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc. 
        return status;
     }    
index 25028d5..c4aec14 100644 (file)
@@ -21,36 +21,44 @@ package org.onap.datalake.feeder.controller;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
+import java.util.Set;
 
+import javax.servlet.http.HttpServletResponse;
+
+import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.domain.Topic;
 import org.onap.datalake.feeder.repository.TopicRepository;
+import org.onap.datalake.feeder.service.DbService;
 import org.onap.datalake.feeder.service.DmaapService;
+import org.onap.datalake.feeder.service.TopicService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.RestController;
 
 /**
- * This controller manages all the topic settings. Topic "_DL_DEFAULT_" acts as
- * the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is
- * used for that topic. All the settings are saved in Couchbase. topic
- * "_DL_DEFAULT_" is populated at setup by a DB script.
+ * This controller manages topic settings. 
+ * 
+ * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic. 
+ * All the settings are saved in database. 
+ * topic "_DL_DEFAULT_" is populated at setup by a DB script.
  * 
  * @author Guobiao Mo
  *
  */
 
 @RestController
-@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })
+@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
 public class TopicController {
 
        private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -60,7 +68,13 @@ public class TopicController {
 
        @Autowired
        private TopicRepository topicRepository;
+       
+       @Autowired
+       private TopicService topicService;
 
+       @Autowired
+       private DbService dbService;
+       
        //list all topics in DMaaP
        @GetMapping("/dmaap/")
        @ResponseBody
@@ -77,33 +91,45 @@ public class TopicController {
        }
 
        //Read a topic
-       @GetMapping("/{name}")
+       @GetMapping("/{topicname}")
        @ResponseBody
-       public Topic getTopic(@PathVariable("name") String topicName) throws IOException {
-               //Topic topic = topicRepository.findFirstById(topicName);       
-               Optional<Topic> topic = topicRepository.findById(topicName);
-               if (topic.isPresent()) {
-                       return topic.get();
-               } else {
-                       return null;
-               }
+       public Topic getTopic(@PathVariable("topicname") String topicName) throws IOException {
+               Topic topic = topicService.getTopic(topicName);
+               return topic;
+       }
+
+       //Read DBs in a topic 
+       @GetMapping("/{topicname}/dbs")
+       @ResponseBody
+       public Set<Db> getTopicDbs(@PathVariable("topicname") String topicName) throws IOException {
+               Topic topic = topicService.getTopic(topicName);
+               Set<Db> dbs = topic.getDbs();
+               return dbs;
        }
 
        //Update Topic
+       //This is not a partial update: old topic is wiped out, and new topic is created base on the input json. 
+       //One exception is that old DBs are kept
        @PutMapping("/")
        @ResponseBody
-       public Topic updateTopic(Topic topic, BindingResult result) throws IOException {
+       public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
 
                if (result.hasErrors()) {
-                       log.error(result.toString());
-                       
-                       return null;//TODO return binding error
+                       sendError(response, 400, "Error parsing Topic: "+result.toString());
+                       return null; 
                }
 
-               Topic oldTopic = getTopic(topic.getId());
+               Topic oldTopic = getTopic(topic.getName());
                if (oldTopic == null) {
-                       return null;//TODO return not found error
+                       sendError(response, 404, "Topic not found "+topic.getName());
+                       return null; 
                } else {
+                       if(!topic.isDefault()) {
+                               Topic defaultTopic = topicService.getDefaultTopic();
+                               topic.setDefaultTopic(defaultTopic);
+                       }
+                       
+                       topic.setDbs(oldTopic.getDbs());
                        topicRepository.save(topic);
                        return topic;
                }
@@ -112,20 +138,56 @@ public class TopicController {
        //create a new Topic  
        @PostMapping("/")
        @ResponseBody
-       public Topic createTopic(Topic topic, BindingResult result) throws IOException {
-
+       public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
+               
                if (result.hasErrors()) {
-                       log.error(result.toString());
+                       sendError(response, 400, "Error parsing Topic: "+result.toString());
                        return null;
                }
 
-               Topic oldTopic = getTopic(topic.getId());
+               Topic oldTopic = getTopic(topic.getName());
                if (oldTopic != null) {
-                       return null;//TODO return 'already exists' error
+                       sendError(response, 400, "Topic already exists "+topic.getName());
+                       return null;
                } else {
+                       if(!topic.isDefault()) {
+                               Topic defaultTopic = topicService.getDefaultTopic();
+                               topic.setDefaultTopic(defaultTopic);
+                       }
+                       
                        topicRepository.save(topic);
                        return topic;
                }
        }
 
+       //delete a db from the topic
+       @DeleteMapping("/{topicname}/db/{dbname}")
+       @ResponseBody
+       public Set<Db> deleteDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException {
+               Topic topic = topicService.getTopic(topicName);
+               Set<Db> dbs = topic.getDbs();
+               dbs.remove(new Db(dbName));
+                
+               topicRepository.save(topic);
+               return topic.getDbs();           
+       }
+
+       //add a db to the topic
+       @PutMapping("/{topicname}/db/{dbname}")
+       @ResponseBody
+       public Set<Db> addDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException {
+               Topic topic = topicService.getTopic(topicName);
+               Set<Db> dbs = topic.getDbs();           
+
+               Db db = dbService.getDb(dbName);                
+               dbs.add(db);
+                
+               topicRepository.save(topic);
+               return topic.getDbs();           
+       }
+       
+       private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+               log.info(msg);
+               response.sendError(sc, msg);            
+       }
 }
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
new file mode 100644 (file)
index 0000000..bbaedad
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import java.util.Set;
+
+import javax.persistence.CascadeType;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.ManyToMany;
+import javax.persistence.Table;
+
+import com.fasterxml.jackson.annotation.JsonBackReference;
+
+import lombok.Getter;
+import lombok.Setter;
+/**
+ * Domain class representing bid data storage
+ * 
+ * @author Guobiao Mo
+ *
+ */
+@Setter
+@Getter
+@Entity
+@Table(name = "db")
+public class Db {
+       @Id
+       private String name;
+
+       private String host;
+       private String login;
+       private String pass;
+
+       private String property1;
+       private String property2;
+       private String property3;        
+
+       @JsonBackReference
+       @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL)
+       /*
+    @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) 
+    @JoinTable(        name                            = "map_db_topic",
+                       joinColumns             = {  @JoinColumn(name="db_name")  },
+                       inverseJoinColumns      = {  @JoinColumn(name="topic_name")  }
+    ) */
+    protected Set<Topic> topics;       
+       
+       public Db() {
+       }
+
+       public Db(String name) {
+               this.name = name;
+       }
+       @Override
+       public boolean equals(Object obj) {             
+               return name.equals(((Db)obj).getName());                
+       }
+
+       @Override
+       public int hashCode() {
+               return name.hashCode();         
+       }
+}
index ace33dc..e1da4d4 100644 (file)
 */
 package org.onap.datalake.feeder.domain;
 
+import java.util.Set;
 import java.util.function.Predicate;
 
-import javax.validation.constraints.NotNull;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.JoinTable;
+import javax.persistence.ManyToMany;
+import javax.persistence.ManyToOne;
+import javax.persistence.Table;
 
 import org.apache.commons.lang3.StringUtils;
 import org.json.JSONObject;
 import org.onap.datalake.feeder.enumeration.DataFormat;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.annotation.Transient;
-import org.springframework.data.couchbase.core.mapping.Document;
 
+import com.fasterxml.jackson.annotation.JsonBackReference;
+
+import lombok.Getter;
 import lombok.Setter;
  
 /**
- * Domain class representing topic table in Couchbase
+ * Domain class representing topic 
  * 
  * @author Guobiao Mo
  *
  */
-@Document
 @Setter
+@Getter
+@Entity
+@Table(name = "topic")
 public class Topic {
-       @NotNull
        @Id
-       private String id;//topic name 
+       private String name;//topic name 
 
-       @Transient
+       @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) 
+       @JoinColumn(name = "default_topic", nullable = true)
        private Topic defaultTopic;
 
        //for protected Kafka topics
        private String login;
        private String pass;
 
+       //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
+       @JsonBackReference
+       //@JsonManagedReference
+    @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER) 
+    @JoinTable(        name                            = "map_db_topic",
+                       joinColumns             = {  @JoinColumn(name="topic_name")  },
+                       inverseJoinColumns      = {  @JoinColumn(name="db_name")  }
+    )
+       protected Set<Db> dbs;
+       
        /**
         *  indicate if we should monitor this topic
         */
@@ -60,20 +82,14 @@ public class Topic {
        /**
         * save raw message text
         */
+       @Column(name = "save_raw")
        private Boolean saveRaw;
 
        /**
-        * true: save it to Elasticsearch false: don't save null: use default
-        */
-       private Boolean supportElasticsearch;
-       private Boolean supportCouchbase;
-       private Boolean supportDruid;
-
-       /**
-        * need to explicitly tell feeder the data format of the message
+        * need to explicitly tell feeder the data format of the message.
         * support JSON, XML, YAML, TEXT
         */
-       private DataFormat dataFormat;
+       private String dataFormat;
 
        /**
         * TTL in day
@@ -81,26 +97,24 @@ public class Topic {
        private Integer ttl; 
        
        //if this flag is true, need to correlate alarm cleared message to previous alarm 
+       @Column(name = "correlate_cleared_message")
        private Boolean correlateClearedMessage;
        
        //the value in the JSON with this path will be used as DB id
+       @Column(name = "message_id_path")
        private String messageIdPath;
 
        public Topic() {
        }
 
-       public Topic(String id) {
-               this.id = id;
+       public Topic(String name) {
+               this.name = name;
        }
 
-       public String getId() {
-               return id;
+       public boolean isDefault() {
+               return "_DL_DEFAULT_".equals(name);
        }
        
-       public void setDefaultTopic(Topic defaultTopic) {
-               this.defaultTopic = defaultTopic;
-       }
-
        public boolean isEnabled() {
                return is(enabled, Topic::isEnabled);   
        }
@@ -121,7 +135,7 @@ public class Topic {
        
        public DataFormat getDataFormat() {
                if (dataFormat != null) {
-                       return dataFormat;
+                       return DataFormat.fromString(dataFormat);
                } else if (defaultTopic != null) {
                        return defaultTopic.getDataFormat();
                } else {
@@ -148,24 +162,51 @@ public class Topic {
                return is(saveRaw, Topic::isSaveRaw);
        }
 
-       public boolean isSupportElasticsearch() {
-               return is(supportElasticsearch, Topic::isSupportElasticsearch);
+       public boolean supportElasticsearch() {
+               return containDb("Elasticsearch");//TODO string hard codes
+       }
+
+       public boolean supportCouchbase() {
+               return containDb("Couchbase");
        }
 
-       public boolean isSupportCouchbase() {
-               return is(supportCouchbase, Topic::isSupportCouchbase);
+       public boolean supportDruid() {
+               return containDb("Druid");
        }
 
-       public boolean isSupportDruid() {
-               return is(supportDruid, Topic::isSupportDruid);
+       public boolean supportMongoDB() {
+               return containDb("MongoDB");
        }
 
-       //extract DB id from a JSON attribute, TODO support multiple attributes
+       private boolean containDb(String dbName) {              
+               Db db = new Db(dbName);
+               
+               if(dbs!=null && dbs.contains(db)) {
+                       return true;
+               }
+               
+               if (defaultTopic != null) {
+                       return defaultTopic.containDb(dbName);
+               } else {
+                       return false;
+               }
+       }
+       
+       //extract DB id from JSON attributes, support multiple attributes
        public String getMessageId(JSONObject json) {
                String id = null;
                
                if(StringUtils.isNotBlank(messageIdPath)) {
-                       id = json.query(messageIdPath).toString();
+                       String[] paths=messageIdPath.split(",");
+                       
+                       StringBuilder sb= new StringBuilder();
+                       for(int i=0; i<paths.length; i++) {
+                               if(i>0) {
+                                       sb.append('^');                                 
+                               }
+                               sb.append(json.query(paths[i]).toString());                             
+                       }
+                       id = sb.toString();
                }
                
                return id;
@@ -173,20 +214,17 @@ public class Topic {
        
        @Override
        public String toString() {
-               return id;
+               return name;
        }
 
-       /**
-        * @return the messageIdPath
-        */
-       public String getMessageIdPath() {
-               return messageIdPath;
+       @Override
+       public boolean equals(Object obj) {             
+               return name.equals(((Topic)obj).getName());             
        }
 
-       /**
-        * @param messageIdPath the messageIdPath to set
-        */
-       public void setMessageIdPath(String messageIdPath) {
-               this.messageIdPath = messageIdPath;
+       @Override
+       public int hashCode() {
+               return name.hashCode();         
        }
+       
 }
 */\r
 package org.onap.datalake.feeder.repository;\r
 \r
+import org.onap.datalake.feeder.domain.Db;\r
+\r
+import org.springframework.data.repository.CrudRepository;\r
+\r
 /**\r
+ * \r
+ * Db Repository \r
+ * \r
  * @author Guobiao Mo\r
  *\r
- */\r
-public interface TopicRepositoryCustom  {\r
-       long updateTopic(String topic, Boolean state);\r
+ */ \r
+\r
+public interface DbRepository extends CrudRepository<Db, String> {\r
+\r
 }\r
index 37d1a66..2d9adef 100644 (file)
 package org.onap.datalake.feeder.repository;\r
 \r
 import org.onap.datalake.feeder.domain.Topic;\r
-import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;\r
-import org.springframework.data.couchbase.core.query.Query;\r
-import org.springframework.data.couchbase.core.query.ViewIndexed;\r
-import org.springframework.data.couchbase.repository.CouchbasePagingAndSortingRepository;\r
-import org.springframework.data.domain.Page;\r
-import org.springframework.data.domain.Pageable;\r
\r
 \r
-import java.util.List;\r
+import org.springframework.data.repository.CrudRepository;\r
 \r
 /**\r
  * \r
- * Topic Repository interface, implementation is taken care by Spring framework.\r
- * Customization is done through TopicRepositoryCustom and its implementation TopicRepositoryImpl. \r
+ * Topic Repository \r
  * \r
  * @author Guobiao Mo\r
  *\r
- */\r
-@ViewIndexed(designDoc = "topic", viewName = "all")\r
-public interface TopicRepository extends CouchbasePagingAndSortingRepository<Topic, String>, TopicRepositoryCustom {\r
-/*\r
-       Topic findFirstById(String topic);\r
-\r
-       Topic findByIdAndState(String topic, boolean state);\r
-\r
-    //Supports native JSON query string\r
-    @Query("{topic:'?0'}")\r
-    Topic findTopicById(String topic);\r
-\r
-    @Query("{topic: { $regex: ?0 } })")\r
-    List<Topic> findTopicByRegExId(String topic);\r
-\r
-\r
-    //Page<Topic> findByCompanyIdAndNameLikeOrderByName(String companyId, String name, Pageable pageable);\r
-\r
-    @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} and companyId = $1 and $2 within #{#n1ql.bucket}")\r
-    Topic findByCompanyAndAreaId(String companyId, String areaId);\r
+ */ \r
 \r
-    @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} AND ANY phone IN phoneNumbers SATISFIES phone = $1 END")\r
-    List<Topic> findByPhoneNumber(String telephoneNumber);\r
+public interface TopicRepository extends CrudRepository<Topic, String> {\r
 \r
-    @Query("SELECT COUNT(*) AS count FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} and companyId = $1")\r
-    Long countBuildings(String companyId);\r
-    */\r
 }\r
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java
deleted file mode 100644 (file)
index 018d5b9..0000000
+++ /dev/null
@@ -1,67 +0,0 @@
-/*\r
-* ============LICENSE_START=======================================================\r
-* ONAP : DataLake\r
-* ================================================================================\r
-* Copyright 2019 China Mobile\r
-*=================================================================================\r
-* Licensed under the Apache License, Version 2.0 (the "License");\r
-* you may not use this file except in compliance with the License.\r
-* You may obtain a copy of the License at\r
-*\r
-*     http://www.apache.org/licenses/LICENSE-2.0\r
-*\r
-* Unless required by applicable law or agreed to in writing, software\r
-* distributed under the License is distributed on an "AS IS" BASIS,\r
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-* See the License for the specific language governing permissions and\r
-* limitations under the License.\r
-* ============LICENSE_END=========================================================\r
-*/\r
-package org.onap.datalake.feeder.repository;\r
-\r
-import org.onap.datalake.feeder.domain.Topic;\r
-import org.springframework.beans.factory.annotation.Autowired;\r
-import org.springframework.data.couchbase.core.CouchbaseTemplate;\r
-/*\r
-import org.springframework.data.mongodb.MongoDbFactory;\r
-import org.springframework.data.mongodb.core.MongoTemplate;\r
-import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;\r
-import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;\r
-import org.springframework.data.mongodb.core.convert.MappingMongoConverter;\r
-import org.springframework.data.mongodb.core.mapping.MongoMappingContext;\r
-import org.springframework.data.mongodb.core.query.Criteria;\r
-import org.springframework.data.mongodb.core.query.Query;\r
-import org.springframework.data.mongodb.core.query.Update;  \r
-\r
-import com.mongodb.WriteResult;\r
-import com.mongodb.client.result.UpdateResult;\r
-*/\r
-import java.util.List;\r
-\r
-/**\r
- * @author Guobiao Mo\r
- *\r
- */\r
-public class TopicRepositoryImpl implements TopicRepositoryCustom {\r
-\r
-    @Autowired\r
-    CouchbaseTemplate template;\r
-    \r
-    @Override\r
-    public long updateTopic(String topic, Boolean state) {\r
-/*\r
-        Query query = new Query(Criteria.where("id").is(topic));\r
-        Update update = new Update();\r
-        update.set("state", state);\r
-\r
-        UpdateResult result = mongoTemplate.updateFirst(query, update, Topic.class);\r
-\r
-        if(result!=null)\r
-            return result.getModifiedCount();\r
-        else\r
-  */          return 0L;\r
-       \r
-       \r
-       \r
-    }\r
-}\r
index 3543258..f74829e 100644 (file)
@@ -27,7 +27,7 @@ import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
  
 import org.json.JSONObject;
-import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.domain.Topic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,18 +57,20 @@ public class CouchbaseService {
        private final Logger log = LoggerFactory.getLogger(this.getClass());
 
        @Autowired
-       private ApplicationConfiguration config;
-
+       private DbService dbService;
+       
        Bucket bucket;          
 
        @PostConstruct
        private void init() {
         // Initialize Couchbase Connection
-        Cluster cluster = CouchbaseCluster.create(config.getCouchbaseHost());
-        cluster.authenticate(config.getCouchbaseUser(), config.getCouchbasePass());
-        bucket = cluster.openBucket(config.getCouchbaseBucket());
+               
+               Db couchbase = dbService.getCouchbase();
+        Cluster cluster = CouchbaseCluster.create(couchbase.getHost());
+        cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
+        bucket = cluster.openBucket(couchbase.getProperty1());
 
-               log.info("Connect to Couchbase " + config.getCouchbaseHost());
+               log.info("Connect to Couchbase " + couchbase.getHost());
                
         // Create a N1QL Primary Index (but ignore if it exists)
         bucket.bucketManager().createN1qlPrimaryIndex(true, false);                 
@@ -90,15 +92,21 @@ public class CouchbaseService {
                        //setup TTL
                        int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
                        
-                       String id = getId(topic.getId());
+                       String id = getId(topic, json);
                        JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
                        documents.add(doc);
                }
                saveDocuments(documents);               
        }
 
-
-       private String getId(String topicStr) {
+       public String getId(Topic topic, JSONObject json) {
+               //if this topic requires extract id from JSON
+               String id = topic.getMessageId(json);
+               if(id != null) {
+                       return id;
+               }
+               
+               String topicStr= topic.getName();               
                //String id = topicStr+":"+timestamp+":"+UUID.randomUUID();
 
                //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2
@@ -106,7 +114,7 @@ public class CouchbaseService {
                // increment by 1, initialize at 0 if counter doc not found
                //TODO how slow is this compared with above UUID approach?
                JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 
-               String id = topicStr +":"+ nextIdNumber.content();
+               id = topicStr +":"+ nextIdNumber.content();
                
                return id;
        }
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DbService.java
new file mode 100644 (file)
index 0000000..f0d943d
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.util.Optional;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service for Dbs 
+ * 
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class DbService {
+
+       private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+       @Autowired
+       private DbRepository dbRepository;
+       
+       public Db getDb(String name) {
+               Optional<Db> ret = dbRepository.findById(name);
+               return ret.isPresent() ? ret.get() : null;
+       }       
+
+       public Db getCouchbase() {
+               return getDb("Couchbase");
+       }
+
+       public Db getElasticsearch() {
+               return getDb("Elasticsearch");
+       }
+
+       public Db getMongoDB() {
+               return getDb("MongoDB");
+       }
+
+       public Db getDruid() {
+               return getDb("Druid");
+       }       
+
+}
index cbcc5f8..1f637e1 100644 (file)
@@ -31,6 +31,7 @@ import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
@@ -40,6 +41,7 @@ import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.json.JSONObject;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.domain.Topic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,14 +62,18 @@ public class ElasticsearchService {
 
        @Autowired
        private ApplicationConfiguration config;
+       
+       @Autowired
+       private DbService dbService;
 
        private RestHighLevelClient client;
        ActionListener<BulkResponse> listener;
 
        @PostConstruct
        private void init() {
-               String elasticsearchHost = config.getElasticsearchHost();
-
+               Db elasticsearch = dbService.getElasticsearch();
+               String elasticsearchHost = elasticsearch.getHost();
+               
                // Initialize the Connection
                client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
 
@@ -93,14 +99,16 @@ public class ElasticsearchService {
        
        public void ensureTableExist(String topic) throws IOException {
                String topicLower = topic.toLowerCase();
-
-               CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
-               try {
+               
+               GetIndexRequest request = new GetIndexRequest();
+               request.indices(topicLower);
+               
+               boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
+               if(!exists){
+                       CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); 
                        CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);          
                        log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
-               }catch(ElasticsearchStatusException e) {
-                       log.info("{} create ES topic status: {}", topic, e.getDetailedMessage());                       
-               }
+               } 
        }
        
        //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
@@ -114,7 +122,10 @@ public class ElasticsearchService {
                                        continue;
                                }
                        }
-                       request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON));
+                       
+                       String id = topic.getMessageId(json); //id can be null
+                       
+                       request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
                }
                if(config.isAsync()) {
                        client.bulkAsync(request, RequestOptions.DEFAULT, listener);                    
@@ -122,19 +133,23 @@ public class ElasticsearchService {
                        try {
                                client.bulk(request, RequestOptions.DEFAULT);
                        } catch (IOException e) { 
-                               log.error( topic.getId() , e);
+                               log.error( topic.getName() , e);
                        }
                }
        }
        
        private boolean correlateClearedMessage(JSONObject json) {
-               boolean found = false;
-               
+               boolean found = true;
+                               
                /*TODO
                 * 1. check if this is a alarm cleared message
                 * 2. search previous alarm message
                 * 3. update previous message, if success, set found=true
                 */
+               //for Sonar test, remove the following
+               if(json.isNull("kkkkk")) {
+                       found = false;
+               }
                
                return found; 
        }
index 3dcbd8e..4433c8c 100644 (file)
@@ -27,8 +27,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import javax.annotation.PostConstruct;
-
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,10 +56,13 @@ public class PullService {
        @Autowired
        private ApplicationConfiguration config;
 
-       @PostConstruct
-       private void init() {
+       /**
+        * @return the isRunning
+        */
+       public boolean isRunning() {
+               return isRunning;
        }
-
        /**
         * start pulling.
         * 
@@ -109,6 +110,7 @@ public class PullService {
                        executorService.awaitTermination(10L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                        logger.error("executor.awaitTermination", e);
+                       Thread.currentThread().interrupt();
                }
                
                isRunning = false;
index 1cd3a8a..84e4fb7 100644 (file)
@@ -152,11 +152,11 @@ public class StoreService {
        }
 
        private void saveJsons(Topic topic, List<JSONObject> jsons) {
-               if (topic.isSupportCouchbase()) {
+               if (topic.supportCouchbase()) {
                        couchbaseService.saveJsons(topic, jsons);
                }
 
-               if (topic.isSupportElasticsearch()) {
+               if (topic.supportElasticsearch()) {
                        elasticsearchService.saveJsons(topic, jsons);
                }
        }
index 9b8fabc..4e10a36 100644 (file)
@@ -34,8 +34,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 /**
- * Service for topics topic setting is stored in Couchbase, bucket 'dl', see
- * application.properties for Spring setup
+ * Service for topics 
  * 
  * @author Guobiao Mo
  *
@@ -68,15 +67,14 @@ public class TopicService {
                return null;
        }
                
+       //TODO caller should not modify the returned topic, maybe return a clone
        public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
                Topic topic = getTopic(topicStr);
                if (topic == null) {
-                       topic = new Topic(topicStr);
+                       topic = getDefaultTopic();
                }
-
-               topic.setDefaultTopic(getDefaultTopic());
                
-               if(ensureTableExist && topic.isEnabled() && topic.isSupportElasticsearch()) { 
+               if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) { 
                        elasticsearchService.ensureTableExist(topicStr); 
                }
                return topic;
index a0ab90f..ea94d00 100644 (file)
@@ -2,6 +2,16 @@
 server.port = 1680
 
 
+# Spring connection to MariaDB for ORM
+#spring.jpa.hibernate.ddl-auto=update
+spring.jpa.hibernate.ddl-auto=none
+spring.jpa.show-sql=false
+
+#spring.datasource.driver-class-name=com.mysql.jdbc.Driver
+spring.datasource.url=jdbc:mariadb://dl_mariadb:3306/datalake?autoReconnect=true&amp;useUnicode=true&amp;characterEncoding=UTF-8
+spring.datasource.username=nook
+spring.datasource.password=nook123
+
 
 #For Beijing lab
 #dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80
@@ -30,24 +40,3 @@ logging.level.org.springframework.web=ERROR
 logging.level.com.att.nsa.apiClient.http=ERROR
 logging.level.org.onap.datalake=DEBUG
  
-# Spring connection to Couchbase DB for ORM
-
-#not work when run in osboxes, but works in Eclipse
-spring.couchbase.bootstrap-hosts=dl_couchbase
-#spring.couchbase.bootstrap-hosts=172.30.1.74
-
-#a user with name as bucket.name must be created, with the pass as bucket.password
-# https://stackoverflow.com/questions/51496589/bucket-password-in-couchbase
-spring.couchbase.bucket.name=dl
-spring.couchbase.bucket.password=dl1234
-spring.data.couchbase.auto-index=true
-
-#DL Feeder DB: Couchbase
-couchbaseHost=dl_couchbase 
-#couchbaseHost=172.30.1.74
-couchbaseUser=dmaap
-couchbasePass=dmaap1234
-couchbaseBucket=dmaap
-
-#DL Feeder DB: Elasticsearch
-elasticsearchHost=dl_es
index e536c7b..a20e5eb 100644 (file)
                "taskDuration": "PT1H",
                "completionTimeout": "PT30M",
                "consumerProperties": {
-                       "bootstrap.servers": "dl_dmaap:9092"
+                       "bootstrap.servers": "message-router-kafka:9092"
                },
                "useEarliestOffset": true
        }
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/AAI-EVENT-sample-format.json
new file mode 100644 (file)
index 0000000..cb3c98d
--- /dev/null
@@ -0,0 +1,52 @@
+{\r
+       "_id": "5bb3dfae5bea3f1cb49d4f3f",\r
+       "cambria.partition": "AAI",\r
+       "event-header": {\r
+               "severity": "NORMAL",\r
+               "entity-type": "esr-thirdparty-sdnc",\r
+               "top-entity-type": "esr-thirdparty-sdnc",\r
+               "entity-link": "/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1",\r
+               "event-type": "AAI-EVENT",\r
+               "domain": "dev",\r
+               "action": "CREATE",\r
+               "sequence-number": "0",\r
+               "id": "69f2935f-c3c1-4a63-b3f1-519c3898328b",\r
+               "source-name": "ying",\r
+               "version": "v11",\r
+               "timestamp": "20180919-06:20:44:216"\r
+       },\r
+       "entity": {\r
+               "thirdparty-sdnc-id": "SDWANController1",\r
+               "resource-version": "1537338043473",\r
+               "location": "Core",\r
+               "product-name": "SD-WAN",\r
+               "esr-system-info-list": {\r
+                       "esr-system-info": [\r
+                               {\r
+                                       "esr-system-info-id": "SDWANController1-ESR-1",\r
+                                       "system-type": "example-system-type-val-12078",\r
+                                       "service-url": "https://172.19.48.77:18008",\r
+                                       "ssl-cacert": "example-ssl-cacert-val-20589",\r
+                                       "type": "WAN",\r
+                                       "ssl-insecure": true,\r
+                                       "system-status": "example-system-status-val-23435",\r
+                                       "version": "V3R1",\r
+                                       "passive": true,\r
+                                       "password": "Onap@12345",\r
+                                       "protocol": "RESTCONF",\r
+                                       "ip-address": "172.19.48.77",\r
+                                       "cloud-domain": "example-cloud-domain-val-76077",\r
+                                       "user-name": "northapi1@huawei.com",\r
+                                       "system-name": "SDWANController",\r
+                                       "port": "18008",\r
+                                       "vendor": "IP-WAN",\r
+                                       "resource-version": "1537338044166",\r
+                                       "remote-path": "example-remotepath-val-5833",\r
+                                       "default-tenant": "example-default-tenant-val-71148"\r
+                               }\r
+                       ]\r
+               }\r
+       },\r
+       "_dl_type_": "JSON",\r
+       "_dl_text_": "{\"cambria.partition\":\"AAI\",\"event-header\":{\"severity\":\"NORMAL\",\"entity-type\":\"esr-thirdparty-sdnc\",\"top-entity-type\":\"esr-thirdparty-sdnc\",\"entity-link\":\"/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1\",\"event-type\":\"AAI-EVENT\",\"domain\":\"dev\",\"action\":\"CREATE\",\"sequence-number\":\"0\",\"id\":\"69f2935f-c3c1-4a63-b3f1-519c3898328b\",\"source-name\":\"ying\",\"version\":\"v11\",\"timestamp\":\"20180919-06:20:44:216\"},\"entity\":{\"thirdparty-sdnc-id\":\"SDWANController1\",\"resource-version\":\"1537338043473\",\"location\":\"Core\",\"product-name\":\"SD-WAN\",\"esr-system-info-list\":{\"esr-system-info\":[{\"esr-system-info-id\":\"SDWANController1-ESR-1\",\"system-type\":\"example-system-type-val-12078\",\"service-url\":\"https://172.19.48.77:18008\",\"ssl-cacert\":\"example-ssl-cacert-val-20589\",\"type\":\"WAN\",\"ssl-insecure\":true,\"system-status\":\"example-system-status-val-23435\",\"version\":\"V3R1\",\"passive\":true,\"password\":\"Onap@12345\",\"protocol\":\"RESTCONF\",\"ip-address\":\"172.19.48.77\",\"cloud-domain\":\"example-cloud-domain-val-76077\",\"user-name\":\"northapi1@huawei.com\",\"system-name\":\"SDWANController\",\"port\":\"18008\",\"vendor\":\"IP-WAN\",\"resource-version\":\"1537338044166\",\"remote-path\":\"example-remotepath-val-5833\",\"default-tenant\":\"example-default-tenant-val-71148\"}]}}}"\r
+}
\ No newline at end of file
index 16eb163..19bf6ed 100644 (file)
                "taskDuration": "PT1H",
                "completionTimeout": "PT30M",
                "consumerProperties": {
-                       "bootstrap.servers": "dl_dmaap:9092"
+                       "bootstrap.servers": "message-router-kafka:9092"
                },
                "useEarliestOffset": true
        }
index 3c871a8..6797b19 100644 (file)
                "taskDuration": "PT1H",
                "completionTimeout": "PT30M",
                "consumerProperties": {
-                       "bootstrap.servers": "dl_dmaap:9092"
+                       "bootstrap.servers": "message-router-kafka:9092"
                },
                "useEarliestOffset": true
        }
index c3f6037..f910ace 100644 (file)
                "taskDuration": "PT1H",
                "completionTimeout": "PT30M",
                "consumerProperties": {
-                       "bootstrap.servers": "dl_dmaap:9092"
+                       "bootstrap.servers": "message-router-kafka:9092"
                },
                "useEarliestOffset": true
        }
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/msgrtr.apinode.metrics.dmaap-sample-format.json
new file mode 100644 (file)
index 0000000..957060f
--- /dev/null
@@ -0,0 +1,75 @@
+{\r
+       "_id": {\r
+               "$oid": "5bb3d3aa5bea3f41300957c6"\r
+       },\r
+       "sendEpsShort": {\r
+               "summary": "0.0 send eps (10 mins)",\r
+               "raw": 0\r
+       },\r
+       "recvEpsInstant": {\r
+               "summary": "0.03333333333333333 recv eps (1 min)",\r
+               "raw": 0.03333333333333333\r
+       },\r
+       "fanOut": {\r
+               "summary": "0.7051873815491838 sends per recv",\r
+               "raw": 0.7051873815491838\r
+       },\r
+       "sendEpsLong": {\r
+               "summary": "0.0 send eps (1 hr)",\r
+               "raw": 0\r
+       },\r
+       "kafkaConsumerTimeouts": {\r
+               "summary": "164 Kafka Consumers Timedout",\r
+               "raw": 164\r
+       },\r
+       "recvEpsLong": {\r
+               "summary": "0.03333333333333333 recv eps (1 hr)",\r
+               "raw": 0.03333333333333333\r
+       },\r
+       "sendEpsInstant": {\r
+               "summary": "0.0 send eps (1 min)",\r
+               "raw": 0\r
+       },\r
+       "recvEpsShort": {\r
+               "summary": "0.03333333333333333 recv eps (10 mins)",\r
+               "raw": 0.03333333333333333\r
+       },\r
+       "kafkaConsumerClaims": {\r
+               "summary": "1 Kafka Consumers Claimed",\r
+               "raw": 1\r
+       },\r
+       "version": {\r
+               "summary": "Version 1.1.3",\r
+               "raw": 0\r
+       },\r
+       "upTime": {\r
+               "summary": "604800 seconds since start",\r
+               "raw": 604800\r
+       },\r
+       "sendTotalEvents": {\r
+               "summary": "46139 Total events sent since start",\r
+               "raw": 46139\r
+       },\r
+       "hostname": "3d5704fccbc5",\r
+       "kafkaConsumerCacheMiss": {\r
+               "summary": "179 Kafka Consumer Cache Misses",\r
+               "raw": 179\r
+       },\r
+       "metricsSendTime": "1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC",\r
+       "kafkaConsumerCacheHit": {\r
+               "summary": "317143 Kafka Consumer Cache Hits",\r
+               "raw": 317143\r
+       },\r
+       "now": 1537639709380,\r
+       "transactionEnabled": false,\r
+       "startTime": {\r
+               "summary": "1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC",\r
+               "raw": 1537034908\r
+       },\r
+       "recvTotalEvents": {\r
+               "summary": "65428 Total events received since start",\r
+               "raw": 65428\r
+       },\r
+       "_dl_type_": "JSON",\r
+       "_dl_text_": "{\"sendEpsShort\":{\"summary\":\"0.0 send eps (10 mins)\",\"raw\":0},\"recvEpsInstant\":{\"summary\":\"0.03333333333333333 recv eps (1 min)\",\"raw\":0.03333333333333333},\"fanOut\":{\"summary\":\"0.7051873815491838 sends per recv\",\"raw\":0.7051873815491838},\"sendEpsLong\":{\"summary\":\"0.0 send eps (1 hr)\",\"raw\":0},\"kafkaConsumerTimeouts\":{\"summary\":\"164 Kafka Consumers Timedout\",\"raw\":164},\"recvEpsLong\":{\"summary\":\"0.03333333333333333 recv eps (1 hr)\",\"raw\":0.03333333333333333},\"sendEpsInstant\":{\"summary\":\"0.0 send eps (1 min)\",\"raw\":0},\"recvEpsShort\":{\"summary\":\"0.03333333333333333 recv eps (10 mins)\",\"raw\":0.03333333333333333},\"kafkaConsumerClaims\":{\"summary\":\"1 Kafka Consumers Claimed\",\"raw\":1},\"version\":{\"summary\":\"Version 1.1.3\",\"raw\":0},\"upTime\":{\"summary\":\"604800 seconds since start\",\"raw\":604800},\"sendTotalEvents\":{\"summary\":\"46139 Total events sent since start\",\"raw\":46139},\"hostname\":\"3d5704fccbc5\",\"kafkaConsumerCacheMiss\":{\"summary\":\"179 Kafka Consumer Cache Misses\",\"raw\":179},\"metricsSendTime\":\"1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC\",\"kafkaConsumerCacheHit\":{\"summary\":\"317143 Kafka Consumer Cache Hits\",\"raw\":317143},\"now\":1537639709380,\"transactionEnabled\":false,\"startTime\":{\"summary\":\"1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC\",\"raw\":1537034908},\"recvTotalEvents\":{\"summary\":\"65428 Total events received since start\",\"raw\":65428}}"\r
+}\r
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.DCAE_CL_OUTPUT-sample-format.json
new file mode 100644 (file)
index 0000000..8de08f7
--- /dev/null
@@ -0,0 +1,25 @@
+{\r
+       "_id": "5bb3d3ad5bea3f41300959ba",\r
+       "closedLoopEventClient": "DCAE.HolmesInstance",\r
+       "policyVersion": "1.0.0.5",\r
+       "policyName": "CCVPN",\r
+       "policyScope": "service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8",\r
+       "target_type": "VM",\r
+       "AAI": {\r
+               "serviceType": "TestService",\r
+               "service-instance_service-instance-id": "200",\r
+               "globalSubscriberId": "Customer1",\r
+               "vserver_vserver-name": "TBD",\r
+               "network-information_network-id": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200"\r
+       },\r
+       "closedLoopAlarmStart": "1532769303924000",\r
+       "closedLoopEventStatus": "ONSET",\r
+       "version": "1.0.2",\r
+       "closedLoopControlName": "ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b",\r
+       "target": "vserver.vserver-name",\r
+       "closedLoopAlarmEnd": "1532769303924000",\r
+       "requestID": "6f455b14-efd9-450a-bf78-e47d55b6da87",\r
+       "from": "DCAE",\r
+       "_dl_type_": "JSON",\r
+       "_dl_text_": "{\"closedLoopEventClient\":\"DCAE.HolmesInstance\",\"policyVersion\":\"1.0.0.5\",\"policyName\":\"CCVPN\",\"policyScope\":\"service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8\",\"target_type\":\"VM\",\"AAI\":{\"serviceType\":\"TestService\",\"service-instance.service-instance-id\":\"200\",\"globalSubscriberId\":\"Customer1\",\"vserver.vserver-name\":\"TBD\",\"network-information.network-id\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200\"},\"closedLoopAlarmStart\":1532769303924000,\"closedLoopEventStatus\":\"ONSET\",\"version\":\"1.0.2\",\"closedLoopControlName\":\"ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b\",\"target\":\"vserver.vserver-name\",\"closedLoopAlarmEnd\":1532769303924000,\"requestID\":\"6f455b14-efd9-450a-bf78-e47d55b6da87\",\"from\":\"DCAE\"}"\r
+}
\ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json b/components/datalake-handler/feeder/src/main/resources/druid/unauthenticated.SEC_FAULT_OUTPUT-sample-format.json
new file mode 100644 (file)
index 0000000..bb506d5
--- /dev/null
@@ -0,0 +1,57 @@
+{\r
+       "_id": "5bb3d3b45bea3f41300960f8",\r
+       "event": {\r
+               "commonEventHeader": {\r
+                       "sourceId": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27",\r
+                       "startEpochMicrosec": "1537438335829000",\r
+                       "eventId": "2ef8b41b-b081-477b-9d0b-1aaaa3b69857",\r
+                       "domain": "fault",\r
+                       "lastEpochMicrosec": 1537438335829000,\r
+                       "eventName": "Fault_Route_Status",\r
+                       "sourceName": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27",\r
+                       "priority": "High",\r
+                       "version": 3,\r
+                       "reportingEntityName": "Domain_Contorller"\r
+               },\r
+               "faultFields": {\r
+                       "eventSeverity": "CRITICAL",\r
+                       "alarmCondition": "Route_Status",\r
+                       "faultFieldsVersion": 2,\r
+                       "specificProblem": "Fault_SOTN_Service_Status",\r
+                       "alarmAdditionalInformation": [\r
+                               {\r
+                                       "name": "networkId",\r
+                                       "value": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100"\r
+                               },\r
+                               {\r
+                                       "name": "node",\r
+                                       "value": "11.11.11.11"\r
+                               },\r
+                               {\r
+                                       "name": "tp-id",\r
+                                       "value": "27"\r
+                               },\r
+                               {\r
+                                       "name": "oper-status",\r
+                                       "value": "down"\r
+                               },\r
+                               {\r
+                                       "name": "network-ref",\r
+                                       "value": "providerId/5555/clientId/6666/topologyId/33"\r
+                               },\r
+                               {\r
+                                       "name": "node-ref",\r
+                                       "value": "0.51.0.103"\r
+                               },\r
+                               {\r
+                                       "name": "tp-ref",\r
+                                       "value": "4"\r
+                               }\r
+                       ],\r
+                       "eventSourceType": "other",\r
+                       "vfStatus": "Active"\r
+               }\r
+       },\r
+       "_dl_type_": "JSON",\r
+       "_dl_text_": "{\"event\":{\"commonEventHeader\":{\"sourceId\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"startEpochMicrosec\":1537438335829000,\"eventId\":\"2ef8b41b-b081-477b-9d0b-1aaaa3b69857\",\"domain\":\"fault\",\"lastEpochMicrosec\":1537438335829000,\"eventName\":\"Fault_Route_Status\",\"sourceName\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"priority\":\"High\",\"version\":3,\"reportingEntityName\":\"Domain_Contorller\"},\"faultFields\":{\"eventSeverity\":\"CRITICAL\",\"alarmCondition\":\"Route_Status\",\"faultFieldsVersion\":2,\"specificProblem\":\"Fault_SOTN_Service_Status\",\"alarmAdditionalInformation\":[{\"name\":\"networkId\",\"value\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100\"},{\"name\":\"node\",\"value\":\"11.11.11.11\"},{\"name\":\"tp-id\",\"value\":\"27\"},{\"name\":\"oper-status\",\"value\":\"down\"},{\"name\":\"network-ref\",\"value\":\"providerId/5555/clientId/6666/topologyId/33\"},{\"name\":\"node-ref\",\"value\":\"0.51.0.103\"},{\"name\":\"tp-ref\",\"value\":\"4\"}],\"eventSourceType\":\"other\",\"vfStatus\":\"Active\"}}}"\r
+}
\ No newline at end of file
index 934451f..02db5a2 100644 (file)
@@ -56,10 +56,6 @@ public class ApplicationConfigurationTest {
 
        @Test
        public void readConfig() {
-               assertNotNull(config.getCouchbaseHost());
-               assertNotNull(config.getCouchbaseUser());
-               assertNotNull(config.getCouchbasePass());
-               assertNotNull(config.getCouchbaseBucket());
 
                assertNotNull(config.getDmaapZookeeperHostPort());
                assertNotNull(config.getDmaapKafkaHostPort());
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/DbTest.java
new file mode 100644 (file)
index 0000000..ea1d689
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import org.junit.Test;
+/**
+ * Test Db
+ * 
+ * @author Guobiao Mo
+ *
+ */
+public class DbTest {  
+
+    @Test
+       public void testIs() {
+               Db couchbase=new Db("Couchbase");
+               Db mongoDB=new Db("MongoDB");
+               Db mongoDB2=new Db("MongoDB");
+               assertNotEquals(couchbase.hashCode(), mongoDB.hashCode());
+               assertNotEquals(couchbase, mongoDB);
+               assertEquals(mongoDB, mongoDB2);                
+       }
+}
index 23ec3b1..1e40252 100644 (file)
@@ -23,8 +23,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.HashSet;
+
 import org.json.JSONObject;
-import org.junit.Test; 
+import org.junit.Test;
+import org.onap.datalake.feeder.enumeration.DataFormat; 
  
 /**
  * Test Topic
@@ -49,18 +52,50 @@ public class TopicTest {
         assertEquals(value, "hello");          
     }
 
+    @Test
+    public void getMessageIdFromMultipleAttributes() {
+       String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}";
+       
+       JSONObject json = new JSONObject(text);
+       
+       Topic topic = new Topic("test getMessageId");
+       topic.setMessageIdPath("/data/data2/value,/data/data3");
+       
+       String value = topic.getMessageId(json);
+
+        assertEquals(value, "hello^world");            
+    }
+
     @Test
        public void testIs() {
-               Topic defaultTopic=new Topic("default");
+               Topic defaultTopic=new Topic("_DL_DEFAULT_");
                Topic testTopic = new Topic("test");
                testTopic.setDefaultTopic(defaultTopic);
+
+               assertTrue(defaultTopic.isDefault());
+               assertFalse(testTopic.isDefault());             
+
+               assertTrue(testTopic.equals(new Topic("test")));
+               assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
+               
+               defaultTopic.setDbs(new HashSet<>());
+               defaultTopic.getDbs().add(new Db("Elasticsearch"));             
+               assertTrue(testTopic.supportElasticsearch());
+               assertFalse(testTopic.supportCouchbase());
+               assertFalse(testTopic.supportDruid());
+               assertFalse(testTopic.supportMongoDB());                
+               
+               defaultTopic.getDbs().remove(new Db("Elasticsearch"));  
+               assertFalse(testTopic.supportElasticsearch());
                
-               defaultTopic.setSupportElasticsearch(true);             
-               boolean b = testTopic.isSupportElasticsearch();
-               assertTrue(b);
+               defaultTopic.setCorrelateClearedMessage(true);
+               defaultTopic.setDataFormat("XML");
+               defaultTopic.setEnabled(true);
+               defaultTopic.setSaveRaw(true);          
+               assertTrue(testTopic.isCorrelateClearedMessage());
+               assertTrue(testTopic.isEnabled());
+               assertTrue(testTopic.isSaveRaw()); 
                
-               defaultTopic.setSupportElasticsearch(false);            
-               b = testTopic.isSupportElasticsearch();
-               assertFalse(b);
+               assertEquals(defaultTopic.getDataFormat(), DataFormat.XML);
        }
 }
index ede5999..d6d98e6 100644 (file)
@@ -2,14 +2,6 @@
 server.port = 1680
 
 
-
-#For Beijing lab
-#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80
-#dmaapKafkaHostPort=kafka.mr01.onap.vip:80
-#spring.couchbase.bootstrap-hosts=172.30.1.74
-#couchbaseHost=172.30.1.74
-
 #DMaaP
 #dmaapZookeeperHostPort=127.0.0.1:2181
 #dmaapKafkaHostPort=127.0.0.1:9092
@@ -30,13 +22,4 @@ logging.level.org.springframework.web=ERROR
 logging.level.com.att.nsa.apiClient.http=ERROR
 logging.level.org.onap.datalake=DEBUG
  
-
-#DL Feeder DB: Couchbase
-couchbaseHost=dl_couchbase 
-#couchbaseHost=172.30.1.74
-couchbaseUser=dmaap
-couchbasePass=dmaap1234
-couchbaseBucket=dmaap
-
-#DL Feeder DB: Elasticsearch
-elasticsearchHost=dl_es
index ede2a27..a526cb5 100644 (file)
                <springcouchbase.version>3.1.2.RELEASE</springcouchbase.version>
                <jackson.version>2.9.6</jackson.version>
                <kafka.version>2.0.0</kafka.version>
-               <elasticsearchjava.version>6.6.0</elasticsearchjava.version>
+               <elasticsearchjava.version>6.7.0</elasticsearchjava.version>
 
        </properties>
 
        <dependencyManagement>
                <dependencies>
 
+                       <dependency>
+                       <groupId>org.mariadb.jdbc</groupId>
+                       <artifactId>mariadb-java-client</artifactId>
+                       <version>2.4.1</version>
+                       </dependency>
+
                        <dependency>
                                <groupId>commons-io</groupId>
                                <artifactId>commons-io</artifactId>
                                <version>${springboot.version}</version>
                        </dependency>
                        <!-- end::actuator[] -->
+                       
+               <dependency>
+               <groupId>org.springframework.boot</groupId>
+                  <artifactId>spring-boot-starter-data-jpa</artifactId>
+                               <version>${springboot.version}</version>
+                       </dependency>
+        
                        <dependency>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-starter-data-couchbase</artifactId>