<dependencies>
-
+       
                <dependency>
-                       <groupId>org.mariadb.jdbc</groupId>
-                       <artifactId>mariadb-java-client</artifactId>
+               <groupId>org.mariadb.jdbc</groupId>
+               <artifactId>mariadb-java-client</artifactId>
                </dependency>
-
+       
                <dependency>
                        <groupId>org.json</groupId>
                        <artifactId>json</artifactId>
                </dependency>
-
+       
                <dependency>
                        <groupId>org.apache.httpcomponents</groupId>
                        <artifactId>httpclient</artifactId>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-actuator</artifactId>
                </dependency>
-
-               <dependency>
-                       <groupId>org.springframework.boot</groupId>
-                       <artifactId>spring-boot-starter-data-jpa</artifactId>
-               </dependency>
-
+        
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-jpa</artifactId>
+        </dependency>
+        
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-data-couchbase</artifactId>
                        <scope>compile</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.mongodb</groupId>
+                       <artifactId>mongo-java-driver</artifactId>
+               </dependency>
        </dependencies>
 
        <build>
 
 CREATE TABLE `db` (\r
   `name` varchar(255) NOT NULL,\r
   `host` varchar(255) DEFAULT NULL,\r
+  `port` int(11) DEFAULT NULL,\r
   `login` varchar(255) DEFAULT NULL,\r
   `pass` varchar(255) DEFAULT NULL,\r
   `property1` varchar(255) DEFAULT NULL,\r
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
 \r
 \r
-insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap');\r
+insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dl','dl1234','dl');\r
 insert into db (name,host) values ('Elasticsearch','dl_es');\r
-insert into db (name,host) values ('MongoDB','dl_mongodb');\r
+insert into db (name,host,port,property1) values ('MongoDB','dl_mongodb',27017,'datalake');\r
 insert into db (name,host) values ('Druid','dl_druid');\r
 \r
 \r
 insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);\r
 \r
 \r
-insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_'); \r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');\r
 
--- /dev/null
+package com.mongodb.internal.validator;\r
+\r
+//copy from https://github.com/mongodb/mongo-java-driver/blob/master/driver-core/src/main/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java\r
+//allow inserting name with dot\r
+/*\r
+* ============LICENSE_START=======================================================\r
+* ONAP : DataLake\r
+* ================================================================================\r
+* Copyright 2018 China Mobile\r
+*=================================================================================\r
+* Licensed under the Apache License, Version 2.0 (the "License");\r
+* you may not use this file except in compliance with the License.\r
+* You may obtain a copy of the License at\r
+*\r
+*     http://www.apache.org/licenses/LICENSE-2.0\r
+*\r
+* Unless required by applicable law or agreed to in writing, software\r
+* distributed under the License is distributed on an "AS IS" BASIS,\r
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+* See the License for the specific language governing permissions and\r
+* limitations under the License.\r
+* ============LICENSE_END=========================================================\r
+*/\r
+import org.bson.FieldNameValidator;\r
+\r
+import java.util.Arrays;\r
+import java.util.List;\r
+\r
+/**\r
+ * A field name validator for document that are meant for storage in MongoDB collections.  It ensures that no fields contain a '.',\r
+ * or start with '$' (with the exception of "$db", "$ref", and "$id", so that DBRefs are not rejected).\r
+ *\r
+ * <p>This class should not be considered a part of the public API.</p>\r
+ */\r
+public class CollectibleDocumentFieldNameValidator implements FieldNameValidator {\r
+    // Have to support DBRef fields\r
+    private static final List<String> EXCEPTIONS = Arrays.asList("$db", "$ref", "$id");\r
+\r
+    @Override\r
+    public boolean validate(final String fieldName) {\r
+        if (fieldName == null) {\r
+            throw new IllegalArgumentException("Field name can not be null");\r
+        }\r
+\r
+        /* dl change\r
+        if (fieldName.contains(".")) {\r
+            return false;\r
+        }*/\r
+\r
+        if (fieldName.startsWith("$") && !EXCEPTIONS.contains(fieldName)) {\r
+            return false;\r
+        }\r
+        return true;\r
+    }\r
+\r
+    @Override\r
+    public FieldNameValidator getValidatorForField(final String fieldName) {\r
+        return this;\r
+    }\r
+}\r
 
        
        @GetMapping("/dmaap/")
        @ResponseBody
-       @ApiOperation(value="List all topics in DMaaP.")
+       @ApiOperation(value="List all topic names in DMaaP.")
        public List<String> listDmaapTopics() throws IOException {
                return dmaapService.getTopics();
        }
 
        @GetMapping("/")
        @ResponseBody
-       @ApiOperation(value="List all topics' details.")
+       @ApiOperation(value="List all topics' settings.")
        public Iterable<Topic> list() throws IOException {
                Iterable<Topic> ret = topicRepository.findAll();
                return ret;
 
        @GetMapping("/{topicName}")
        @ResponseBody
-       @ApiOperation(value="Get a topic's details.")
+       @ApiOperation(value="Get a topic's settings.")
        public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException {
                Topic topic = topicService.getTopic(topicName);
                return topic;
 
 
 import lombok.Getter;
 import lombok.Setter;
- 
+
 /**
  * Domain class representing bid data storage
  * 
        private String name;
 
        private String host;
+       private Integer port;
        private String login;
        private String pass;
 
        private String property1;
        private String property2;
-       private String property3;        
+       private String property3;
 
        @JsonBackReference
-       @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL)
+       @ManyToMany(mappedBy = "dbs", cascade = CascadeType.ALL)
        /*
-    @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) 
-    @JoinTable(        name                            = "map_db_topic",
-                       joinColumns             = {  @JoinColumn(name="db_name")  },
-                       inverseJoinColumns      = {  @JoinColumn(name="topic_name")  }
-    ) */
-    protected Set<Topic> topics;       
-       
+       @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) 
+       @JoinTable(     name                            = "map_db_topic",
+                               joinColumns             = {  @JoinColumn(name="db_name")  },
+                               inverseJoinColumns      = {  @JoinColumn(name="topic_name")  }
+       ) */
+       protected Set<Topic> topics;
+
        public Db() {
        }
 
        public Db(String name) {
                this.name = name;
        }
- 
+
        @Override
-       public boolean equals(Object obj) {             
-               return name.equals(((Db)obj).getName());                
+       public boolean equals(Object obj) {
+               if (obj == null)
+                       return false;
+
+               if (this.getClass() != obj.getClass())
+                       return false;
+
+               return name.equals(((Db) obj).getName());
        }
 
        @Override
        public int hashCode() {
-               return name.hashCode();         
+               return name.hashCode();
        }
 }
 
 
 import lombok.Getter;
 import lombok.Setter;
- 
+
 /**
- * Domain class representing topic 
+ * Domain class representing topic
  * 
  * @author Guobiao Mo
  *
        @Id
        private String name;//topic name 
 
-       @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) 
+       @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
        @JoinColumn(name = "default_topic", nullable = true)
        private Topic defaultTopic;
 
        //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
        @JsonBackReference
        //@JsonManagedReference
-    @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER) 
+       @ManyToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER)
     @JoinTable(        name                            = "map_db_topic",
                        joinColumns             = {  @JoinColumn(name="topic_name")  },
                        inverseJoinColumns      = {  @JoinColumn(name="db_name")  }
     )
        protected Set<Db> dbs;
-       
+
        /**
-        *  indicate if we should monitor this topic
+        * indicate if we should monitor this topic
         */
        private Boolean enabled;
-       
+
        /**
         * save raw message text
         */
        /**
         * TTL in day
         */
-       private Integer ttl; 
-       
+       private Integer ttl;
+
        //if this flag is true, need to correlate alarm cleared message to previous alarm 
        @Column(name = "correlate_cleared_message")
        private Boolean correlateClearedMessage;
-       
+
        //the value in the JSON with this path will be used as DB id
        @Column(name = "message_id_path")
        private String messageIdPath;
        public boolean isDefault() {
                return "_DL_DEFAULT_".equals(name);
        }
-       
+
        public boolean isEnabled() {
-               return is(enabled, Topic::isEnabled);   
+               return is(enabled, Topic::isEnabled);
        }
 
        public boolean isCorrelateClearedMessage() {
                return is(correlateClearedMessage, Topic::isCorrelateClearedMessage);
        }
-       
+
        public int getTtl() {
                if (ttl != null) {
                        return ttl;
                        return defaultTopic.getTtl();
                } else {
                        return 3650;//default to 10 years for safe
-               }               
+               }
        }
-       
+
        public DataFormat getDataFormat() {
                if (dataFormat != null) {
                        return DataFormat.fromString(dataFormat);
        private boolean is(Boolean b, Predicate<Topic> pre) {
                return is(b, pre, false);
        }
-       
+
        private boolean is(Boolean b, Predicate<Topic> pre, boolean defaultValue) {
                if (b != null) {
                        return b;
                return containDb("MongoDB");
        }
 
-       private boolean containDb(String dbName) {              
+       private boolean containDb(String dbName) {
                Db db = new Db(dbName);
-               
-               if(dbs!=null && dbs.contains(db)) {
+
+               if (dbs != null && dbs.contains(db)) {
                        return true;
                }
-               
+
                if (defaultTopic != null) {
                        return defaultTopic.containDb(dbName);
                } else {
                        return false;
                }
        }
-       
+
        //extract DB id from JSON attributes, support multiple attributes
        public String getMessageId(JSONObject json) {
                String id = null;
-               
-               if(StringUtils.isNotBlank(messageIdPath)) {
-                       String[] paths=messageIdPath.split(",");
-                       
-                       StringBuilder sb= new StringBuilder();
-                       for(int i=0; i<paths.length; i++) {
-                               if(i>0) {
-                                       sb.append('^');                                 
+
+               if (StringUtils.isNotBlank(messageIdPath)) {
+                       String[] paths = messageIdPath.split(",");
+
+                       StringBuilder sb = new StringBuilder();
+                       for (int i = 0; i < paths.length; i++) {
+                               if (i > 0) {
+                                       sb.append('^');
                                }
-                               sb.append(json.query(paths[i]).toString());                             
+                               sb.append(json.query(paths[i]).toString());
                        }
                        id = sb.toString();
                }
-               
+
                return id;
        }
-       
+
        @Override
        public String toString() {
                return name;
        }
 
        @Override
-       public boolean equals(Object obj) {             
-               return name.equals(((Topic)obj).getName());             
+       public boolean equals(Object obj) {
+               if (obj == null)
+                       return false;
+
+               if (this.getClass() != obj.getClass())
+                       return false;
+
+               return name.equals(((Topic) obj).getName());
        }
 
        @Override
        public int hashCode() {
-               return name.hashCode();         
+               return name.hashCode();
        }
-       
+
 }
 
 import org.apache.http.HttpHost;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest; 
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
        public void ensureTableExist(String topic) throws IOException {
                String topicLower = topic.toLowerCase();
                
-               GetIndexRequest request = new GetIndexRequest();
-               request.indices(topicLower);
+               GetIndexRequest request = new GetIndexRequest(topicLower); 
                
                boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
                if(!exists){
                        CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); 
                        CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);          
                        log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
-               } 
+               }
        }
        
        //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
 
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2018 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.bson.Document;
+
+import org.json.JSONObject;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+/**
+ * Service to use MongoDB
+ * 
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class MongodbService {
+
+       private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+       @Autowired
+       private DbService dbService;
+
+       private MongoDatabase database;
+       private MongoClient mongoClient;
+       private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
+
+       @PostConstruct
+       private void init() {
+               Db mongodb = dbService.getMongoDB();
+
+               mongoClient = new MongoClient(mongodb.getHost(), mongodb.getPort());
+               database = mongoClient.getDatabase(mongodb.getProperty1());
+       }
+
+       @PreDestroy
+       public void cleanUp() {
+               mongoClient.close();
+       }
+
+       public void saveJsons(Topic topic, List<JSONObject> jsons) {
+               List<Document> documents = new ArrayList<>(jsons.size());
+               for (JSONObject json : jsons) {
+                       //convert org.json JSONObject to MongoDB Document
+                       Document doc = Document.parse(json.toString());
+
+                       String id = topic.getMessageId(json); //id can be null
+                       if (id != null) {
+                               doc.put("_id", id);
+                       }
+                       documents.add(doc);
+               }
+
+               String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]","");//remove - _ .
+               MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k));
+               collection.insertMany(documents);
+
+               log.debug("saved text to topic = {}, topic total count = {} ", topic, collection.countDocuments());
+       }
+
+}
 
        @Autowired
        private TopicService topicService;
 
+       @Autowired
+       private MongodbService mongodbService;
+
        @Autowired
        private CouchbaseService couchbaseService;
 
        }
 
        private void saveJsons(Topic topic, List<JSONObject> jsons) {
+               if (topic.supportMongoDB()) {
+                       mongodbService.saveJsons(topic, jsons);
+               }
+
                if (topic.supportCouchbase()) {
                        couchbaseService.saveJsons(topic, jsons);
                }
 
        public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
                Topic topic = getTopic(topicStr);
                if (topic == null) {
-                       topic = getDefaultTopic();
+                       topic = new Topic(topicStr);
+                       topic.setDefaultTopic(getDefaultTopic());
                }
                
                if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) { 
 
                <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
                <java.version>1.8</java.version>
 
-               <mongojava.version>3.8.1</mongojava.version>
+               <mongojava.version>3.10.1</mongojava.version>
                <springboot.version>2.1.0.RELEASE</springboot.version>
                <springcouchbase.version>3.1.2.RELEASE</springcouchbase.version>
                <jackson.version>2.9.6</jackson.version>
                <kafka.version>2.0.0</kafka.version>
-               <elasticsearchjava.version>6.7.0</elasticsearchjava.version>
+               <elasticsearchjava.version>7.0.0-rc2</elasticsearchjava.version>
 
        </properties>