Support MongoDB as a data storage 06/84806/1
authorGuobiao Mo <guobiaomo@chinamobile.com>
Wed, 10 Apr 2019 07:35:43 +0000 (00:35 -0700)
committerGuobiao Mo <guobiaomo@chinamobile.com>
Wed, 10 Apr 2019 07:35:43 +0000 (00:35 -0700)
Issue-ID: DCAEGEN2-1411
Change-Id: I06b69605e88d5b81500b788847e7c90ff4017a07
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
components/datalake-handler/feeder/pom.xml
components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java [new file with mode: 0644]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java [new file with mode: 0644]
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
components/datalake-handler/pom.xml

index d6b5078..9c1bb78 100644 (file)
 
 
        <dependencies>
-
+       
                <dependency>
-                       <groupId>org.mariadb.jdbc</groupId>
-                       <artifactId>mariadb-java-client</artifactId>
+               <groupId>org.mariadb.jdbc</groupId>
+               <artifactId>mariadb-java-client</artifactId>
                </dependency>
-
+       
                <dependency>
                        <groupId>org.json</groupId>
                        <artifactId>json</artifactId>
                </dependency>
-
+       
                <dependency>
                        <groupId>org.apache.httpcomponents</groupId>
                        <artifactId>httpclient</artifactId>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-actuator</artifactId>
                </dependency>
-
-               <dependency>
-                       <groupId>org.springframework.boot</groupId>
-                       <artifactId>spring-boot-starter-data-jpa</artifactId>
-               </dependency>
-
+        
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-jpa</artifactId>
+        </dependency>
+        
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-data-couchbase</artifactId>
                        <scope>compile</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.mongodb</groupId>
+                       <artifactId>mongo-java-driver</artifactId>
+               </dependency>
        </dependencies>
 
        <build>
index 2185320..83db9f1 100644 (file)
@@ -21,6 +21,7 @@ CREATE TABLE `topic` (
 CREATE TABLE `db` (\r
   `name` varchar(255) NOT NULL,\r
   `host` varchar(255) DEFAULT NULL,\r
+  `port` int(11) DEFAULT NULL,\r
   `login` varchar(255) DEFAULT NULL,\r
   `pass` varchar(255) DEFAULT NULL,\r
   `property1` varchar(255) DEFAULT NULL,\r
@@ -40,9 +41,9 @@ CREATE TABLE `map_db_topic` (
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
 \r
 \r
-insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap');\r
+insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dl','dl1234','dl');\r
 insert into db (name,host) values ('Elasticsearch','dl_es');\r
-insert into db (name,host) values ('MongoDB','dl_mongodb');\r
+insert into db (name,host,port,property1) values ('MongoDB','dl_mongodb',27017,'datalake');\r
 insert into db (name,host) values ('Druid','dl_druid');\r
 \r
 \r
@@ -51,4 +52,4 @@ insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_D
 insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);\r
 \r
 \r
-insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_'); \r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');\r
diff --git a/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java b/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java
new file mode 100644 (file)
index 0000000..2bc6faa
--- /dev/null
@@ -0,0 +1,60 @@
+package com.mongodb.internal.validator;\r
+\r
+//copy from https://github.com/mongodb/mongo-java-driver/blob/master/driver-core/src/main/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java\r
+//allow inserting name with dot\r
+/*\r
+* ============LICENSE_START=======================================================\r
+* ONAP : DataLake\r
+* ================================================================================\r
+* Copyright 2018 China Mobile\r
+*=================================================================================\r
+* Licensed under the Apache License, Version 2.0 (the "License");\r
+* you may not use this file except in compliance with the License.\r
+* You may obtain a copy of the License at\r
+*\r
+*     http://www.apache.org/licenses/LICENSE-2.0\r
+*\r
+* Unless required by applicable law or agreed to in writing, software\r
+* distributed under the License is distributed on an "AS IS" BASIS,\r
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+* See the License for the specific language governing permissions and\r
+* limitations under the License.\r
+* ============LICENSE_END=========================================================\r
+*/\r
+import org.bson.FieldNameValidator;\r
+\r
+import java.util.Arrays;\r
+import java.util.List;\r
+\r
+/**\r
+ * A field name validator for document that are meant for storage in MongoDB collections.  It ensures that no fields contain a '.',\r
+ * or start with '$' (with the exception of "$db", "$ref", and "$id", so that DBRefs are not rejected).\r
+ *\r
+ * <p>This class should not be considered a part of the public API.</p>\r
+ */\r
+public class CollectibleDocumentFieldNameValidator implements FieldNameValidator {\r
+    // Have to support DBRef fields\r
+    private static final List<String> EXCEPTIONS = Arrays.asList("$db", "$ref", "$id");\r
+\r
+    @Override\r
+    public boolean validate(final String fieldName) {\r
+        if (fieldName == null) {\r
+            throw new IllegalArgumentException("Field name can not be null");\r
+        }\r
+\r
+        /* dl change\r
+        if (fieldName.contains(".")) {\r
+            return false;\r
+        }*/\r
+\r
+        if (fieldName.startsWith("$") && !EXCEPTIONS.contains(fieldName)) {\r
+            return false;\r
+        }\r
+        return true;\r
+    }\r
+\r
+    @Override\r
+    public FieldNameValidator getValidatorForField(final String fieldName) {\r
+        return this;\r
+    }\r
+}\r
index bf9e417..747a72c 100644 (file)
@@ -79,14 +79,14 @@ public class TopicController {
        
        @GetMapping("/dmaap/")
        @ResponseBody
-       @ApiOperation(value="List all topics in DMaaP.")
+       @ApiOperation(value="List all topic names in DMaaP.")
        public List<String> listDmaapTopics() throws IOException {
                return dmaapService.getTopics();
        }
 
        @GetMapping("/")
        @ResponseBody
-       @ApiOperation(value="List all topics' details.")
+       @ApiOperation(value="List all topics' settings.")
        public Iterable<Topic> list() throws IOException {
                Iterable<Topic> ret = topicRepository.findAll();
                return ret;
@@ -94,7 +94,7 @@ public class TopicController {
 
        @GetMapping("/{topicName}")
        @ResponseBody
-       @ApiOperation(value="Get a topic's details.")
+       @ApiOperation(value="Get a topic's settings.")
        public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException {
                Topic topic = topicService.getTopic(topicName);
                return topic;
index bbaedad..306af49 100644 (file)
@@ -31,7 +31,7 @@ import com.fasterxml.jackson.annotation.JsonBackReference;
 
 import lombok.Getter;
 import lombok.Setter;
+
 /**
  * Domain class representing bid data storage
  * 
@@ -47,37 +47,44 @@ public class Db {
        private String name;
 
        private String host;
+       private Integer port;
        private String login;
        private String pass;
 
        private String property1;
        private String property2;
-       private String property3;        
+       private String property3;
 
        @JsonBackReference
-       @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL)
+       @ManyToMany(mappedBy = "dbs", cascade = CascadeType.ALL)
        /*
-    @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) 
-    @JoinTable(        name                            = "map_db_topic",
-                       joinColumns             = {  @JoinColumn(name="db_name")  },
-                       inverseJoinColumns      = {  @JoinColumn(name="topic_name")  }
-    ) */
-    protected Set<Topic> topics;       
-       
+       @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) 
+       @JoinTable(     name                            = "map_db_topic",
+                               joinColumns             = {  @JoinColumn(name="db_name")  },
+                               inverseJoinColumns      = {  @JoinColumn(name="topic_name")  }
+       ) */
+       protected Set<Topic> topics;
+
        public Db() {
        }
 
        public Db(String name) {
                this.name = name;
        }
+
        @Override
-       public boolean equals(Object obj) {             
-               return name.equals(((Db)obj).getName());                
+       public boolean equals(Object obj) {
+               if (obj == null)
+                       return false;
+
+               if (this.getClass() != obj.getClass())
+                       return false;
+
+               return name.equals(((Db) obj).getName());
        }
 
        @Override
        public int hashCode() {
-               return name.hashCode();         
+               return name.hashCode();
        }
 }
index e1da4d4..20ebf94 100644 (file)
@@ -41,9 +41,9 @@ import com.fasterxml.jackson.annotation.JsonBackReference;
 
 import lombok.Getter;
 import lombok.Setter;
+
 /**
- * Domain class representing topic 
+ * Domain class representing topic
  * 
  * @author Guobiao Mo
  *
@@ -56,7 +56,7 @@ public class Topic {
        @Id
        private String name;//topic name 
 
-       @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) 
+       @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
        @JoinColumn(name = "default_topic", nullable = true)
        private Topic defaultTopic;
 
@@ -67,18 +67,18 @@ public class Topic {
        //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
        @JsonBackReference
        //@JsonManagedReference
-    @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER) 
+       @ManyToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER)
     @JoinTable(        name                            = "map_db_topic",
                        joinColumns             = {  @JoinColumn(name="topic_name")  },
                        inverseJoinColumns      = {  @JoinColumn(name="db_name")  }
     )
        protected Set<Db> dbs;
-       
+
        /**
-        *  indicate if we should monitor this topic
+        * indicate if we should monitor this topic
         */
        private Boolean enabled;
-       
+
        /**
         * save raw message text
         */
@@ -94,12 +94,12 @@ public class Topic {
        /**
         * TTL in day
         */
-       private Integer ttl; 
-       
+       private Integer ttl;
+
        //if this flag is true, need to correlate alarm cleared message to previous alarm 
        @Column(name = "correlate_cleared_message")
        private Boolean correlateClearedMessage;
-       
+
        //the value in the JSON with this path will be used as DB id
        @Column(name = "message_id_path")
        private String messageIdPath;
@@ -114,15 +114,15 @@ public class Topic {
        public boolean isDefault() {
                return "_DL_DEFAULT_".equals(name);
        }
-       
+
        public boolean isEnabled() {
-               return is(enabled, Topic::isEnabled);   
+               return is(enabled, Topic::isEnabled);
        }
 
        public boolean isCorrelateClearedMessage() {
                return is(correlateClearedMessage, Topic::isCorrelateClearedMessage);
        }
-       
+
        public int getTtl() {
                if (ttl != null) {
                        return ttl;
@@ -130,9 +130,9 @@ public class Topic {
                        return defaultTopic.getTtl();
                } else {
                        return 3650;//default to 10 years for safe
-               }               
+               }
        }
-       
+
        public DataFormat getDataFormat() {
                if (dataFormat != null) {
                        return DataFormat.fromString(dataFormat);
@@ -147,7 +147,7 @@ public class Topic {
        private boolean is(Boolean b, Predicate<Topic> pre) {
                return is(b, pre, false);
        }
-       
+
        private boolean is(Boolean b, Predicate<Topic> pre, boolean defaultValue) {
                if (b != null) {
                        return b;
@@ -178,53 +178,59 @@ public class Topic {
                return containDb("MongoDB");
        }
 
-       private boolean containDb(String dbName) {              
+       private boolean containDb(String dbName) {
                Db db = new Db(dbName);
-               
-               if(dbs!=null && dbs.contains(db)) {
+
+               if (dbs != null && dbs.contains(db)) {
                        return true;
                }
-               
+
                if (defaultTopic != null) {
                        return defaultTopic.containDb(dbName);
                } else {
                        return false;
                }
        }
-       
+
        //extract DB id from JSON attributes, support multiple attributes
        public String getMessageId(JSONObject json) {
                String id = null;
-               
-               if(StringUtils.isNotBlank(messageIdPath)) {
-                       String[] paths=messageIdPath.split(",");
-                       
-                       StringBuilder sb= new StringBuilder();
-                       for(int i=0; i<paths.length; i++) {
-                               if(i>0) {
-                                       sb.append('^');                                 
+
+               if (StringUtils.isNotBlank(messageIdPath)) {
+                       String[] paths = messageIdPath.split(",");
+
+                       StringBuilder sb = new StringBuilder();
+                       for (int i = 0; i < paths.length; i++) {
+                               if (i > 0) {
+                                       sb.append('^');
                                }
-                               sb.append(json.query(paths[i]).toString());                             
+                               sb.append(json.query(paths[i]).toString());
                        }
                        id = sb.toString();
                }
-               
+
                return id;
        }
-       
+
        @Override
        public String toString() {
                return name;
        }
 
        @Override
-       public boolean equals(Object obj) {             
-               return name.equals(((Topic)obj).getName());             
+       public boolean equals(Object obj) {
+               if (obj == null)
+                       return false;
+
+               if (this.getClass() != obj.getClass())
+                       return false;
+
+               return name.equals(((Topic) obj).getName());
        }
 
        @Override
        public int hashCode() {
-               return name.hashCode();         
+               return name.hashCode();
        }
-       
+
 }
index 1f637e1..fea0718 100644 (file)
@@ -29,9 +29,9 @@ import javax.annotation.PreDestroy;
 import org.apache.http.HttpHost;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest; 
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
@@ -100,15 +100,14 @@ public class ElasticsearchService {
        public void ensureTableExist(String topic) throws IOException {
                String topicLower = topic.toLowerCase();
                
-               GetIndexRequest request = new GetIndexRequest();
-               request.indices(topicLower);
+               GetIndexRequest request = new GetIndexRequest(topicLower); 
                
                boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
                if(!exists){
                        CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); 
                        CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);          
                        log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
-               } 
+               }
        }
        
        //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
new file mode 100644 (file)
index 0000000..2b88921
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2018 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.bson.Document;
+
+import org.json.JSONObject;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+/**
+ * Service to use MongoDB
+ * 
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class MongodbService {
+
+       private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+       @Autowired
+       private DbService dbService;
+
+       private MongoDatabase database;
+       private MongoClient mongoClient;
+       private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
+
+       @PostConstruct
+       private void init() {
+               Db mongodb = dbService.getMongoDB();
+
+               mongoClient = new MongoClient(mongodb.getHost(), mongodb.getPort());
+               database = mongoClient.getDatabase(mongodb.getProperty1());
+       }
+
+       @PreDestroy
+       public void cleanUp() {
+               mongoClient.close();
+       }
+
+       public void saveJsons(Topic topic, List<JSONObject> jsons) {
+               List<Document> documents = new ArrayList<>(jsons.size());
+               for (JSONObject json : jsons) {
+                       //convert org.json JSONObject to MongoDB Document
+                       Document doc = Document.parse(json.toString());
+
+                       String id = topic.getMessageId(json); //id can be null
+                       if (id != null) {
+                               doc.put("_id", id);
+                       }
+                       documents.add(doc);
+               }
+
+               String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]","");//remove - _ .
+               MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k));
+               collection.insertMany(documents);
+
+               log.debug("saved text to topic = {}, topic total count = {} ", topic, collection.countDocuments());
+       }
+
+}
index 84e4fb7..d9fe12a 100644 (file)
@@ -62,6 +62,9 @@ public class StoreService {
        @Autowired
        private TopicService topicService;
 
+       @Autowired
+       private MongodbService mongodbService;
+
        @Autowired
        private CouchbaseService couchbaseService;
 
@@ -152,6 +155,10 @@ public class StoreService {
        }
 
        private void saveJsons(Topic topic, List<JSONObject> jsons) {
+               if (topic.supportMongoDB()) {
+                       mongodbService.saveJsons(topic, jsons);
+               }
+
                if (topic.supportCouchbase()) {
                        couchbaseService.saveJsons(topic, jsons);
                }
index 4e10a36..cd5113c 100644 (file)
@@ -71,7 +71,8 @@ public class TopicService {
        public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
                Topic topic = getTopic(topicStr);
                if (topic == null) {
-                       topic = getDefaultTopic();
+                       topic = new Topic(topicStr);
+                       topic.setDefaultTopic(getDefaultTopic());
                }
                
                if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) { 
index a526cb5..45db09d 100644 (file)
                <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
                <java.version>1.8</java.version>
 
-               <mongojava.version>3.8.1</mongojava.version>
+               <mongojava.version>3.10.1</mongojava.version>
                <springboot.version>2.1.0.RELEASE</springboot.version>
                <springcouchbase.version>3.1.2.RELEASE</springcouchbase.version>
                <jackson.version>2.9.6</jackson.version>
                <kafka.version>2.0.0</kafka.version>
-               <elasticsearchjava.version>6.7.0</elasticsearchjava.version>
+               <elasticsearchjava.version>7.0.0-rc2</elasticsearchjava.version>
 
        </properties>