From 35cc15e04411008b2f8094bbd3876e7a2daed587 Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Wed, 10 Apr 2019 00:35:43 -0700 Subject: [PATCH] Support MongoDB as a data storage Issue-ID: DCAEGEN2-1411 Change-Id: I06b69605e88d5b81500b788847e7c90ff4017a07 Signed-off-by: Guobiao Mo --- components/datalake-handler/feeder/pom.xml | 26 +++--- .../feeder/src/assembly/scripts/init_db.sql | 7 +- .../CollectibleDocumentFieldNameValidator.java | 60 +++++++++++++ .../feeder/controller/TopicController.java | 6 +- .../java/org/onap/datalake/feeder/domain/Db.java | 35 ++++---- .../org/onap/datalake/feeder/domain/Topic.java | 78 +++++++++-------- .../feeder/service/ElasticsearchService.java | 11 ++- .../datalake/feeder/service/MongodbService.java | 98 ++++++++++++++++++++++ .../onap/datalake/feeder/service/StoreService.java | 7 ++ .../onap/datalake/feeder/service/TopicService.java | 3 +- components/datalake-handler/pom.xml | 4 +- 11 files changed, 259 insertions(+), 76 deletions(-) create mode 100644 components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java diff --git a/components/datalake-handler/feeder/pom.xml b/components/datalake-handler/feeder/pom.xml index d6b50787..9c1bb785 100644 --- a/components/datalake-handler/feeder/pom.xml +++ b/components/datalake-handler/feeder/pom.xml @@ -17,17 +17,17 @@ - + - org.mariadb.jdbc - mariadb-java-client + org.mariadb.jdbc + mariadb-java-client - + org.json json - + org.apache.httpcomponents httpclient @@ -47,12 +47,12 @@ org.springframework.boot spring-boot-starter-actuator - - - org.springframework.boot - spring-boot-starter-data-jpa - - + + + org.springframework.boot + spring-boot-starter-data-jpa + + org.springframework.boot spring-boot-starter-data-couchbase @@ -148,6 +148,10 @@ compile + + org.mongodb + mongo-java-driver + diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql index 2185320a..83db9f1f 100644 --- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql +++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql @@ -21,6 +21,7 @@ CREATE TABLE `topic` ( CREATE TABLE `db` ( `name` varchar(255) NOT NULL, `host` varchar(255) DEFAULT NULL, + `port` int(11) DEFAULT NULL, `login` varchar(255) DEFAULT NULL, `pass` varchar(255) DEFAULT NULL, `property1` varchar(255) DEFAULT NULL, @@ -40,9 +41,9 @@ CREATE TABLE `map_db_topic` ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap'); +insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dl','dl1234','dl'); insert into db (name,host) values ('Elasticsearch','dl_es'); -insert into db (name,host) values ('MongoDB','dl_mongodb'); +insert into db (name,host,port,property1) values ('MongoDB','dl_mongodb',27017,'datalake'); insert into db (name,host) values ('Druid','dl_druid'); @@ -51,4 +52,4 @@ insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_D insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0); -insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_'); +insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_'); diff --git a/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java b/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java new file mode 100644 index 00000000..2bc6faad --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java @@ -0,0 +1,60 @@ +package com.mongodb.internal.validator; + +//copy from https://github.com/mongodb/mongo-java-driver/blob/master/driver-core/src/main/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java +//allow inserting name with dot +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2018 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +import org.bson.FieldNameValidator; + +import java.util.Arrays; +import java.util.List; + +/** + * A field name validator for document that are meant for storage in MongoDB collections. It ensures that no fields contain a '.', + * or start with '$' (with the exception of "$db", "$ref", and "$id", so that DBRefs are not rejected). + * + *

This class should not be considered a part of the public API.

+ */ +public class CollectibleDocumentFieldNameValidator implements FieldNameValidator { + // Have to support DBRef fields + private static final List EXCEPTIONS = Arrays.asList("$db", "$ref", "$id"); + + @Override + public boolean validate(final String fieldName) { + if (fieldName == null) { + throw new IllegalArgumentException("Field name can not be null"); + } + + /* dl change + if (fieldName.contains(".")) { + return false; + }*/ + + if (fieldName.startsWith("$") && !EXCEPTIONS.contains(fieldName)) { + return false; + } + return true; + } + + @Override + public FieldNameValidator getValidatorForField(final String fieldName) { + return this; + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java index bf9e417f..747a72c8 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -79,14 +79,14 @@ public class TopicController { @GetMapping("/dmaap/") @ResponseBody - @ApiOperation(value="List all topics in DMaaP.") + @ApiOperation(value="List all topic names in DMaaP.") public List listDmaapTopics() throws IOException { return dmaapService.getTopics(); } @GetMapping("/") @ResponseBody - @ApiOperation(value="List all topics' details.") + @ApiOperation(value="List all topics' settings.") public Iterable list() throws IOException { Iterable ret = topicRepository.findAll(); return ret; @@ -94,7 +94,7 @@ public class TopicController { @GetMapping("/{topicName}") @ResponseBody - @ApiOperation(value="Get a topic's details.") + @ApiOperation(value="Get a topic's settings.") public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException { Topic topic = topicService.getTopic(topicName); return topic; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java index bbaedadc..306af490 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Db.java @@ -31,7 +31,7 @@ import com.fasterxml.jackson.annotation.JsonBackReference; import lombok.Getter; import lombok.Setter; - + /** * Domain class representing bid data storage * @@ -47,37 +47,44 @@ public class Db { private String name; private String host; + private Integer port; private String login; private String pass; private String property1; private String property2; - private String property3; + private String property3; @JsonBackReference - @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL) + @ManyToMany(mappedBy = "dbs", cascade = CascadeType.ALL) /* - @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) - @JoinTable( name = "map_db_topic", - joinColumns = { @JoinColumn(name="db_name") }, - inverseJoinColumns = { @JoinColumn(name="topic_name") } - ) */ - protected Set topics; - + @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER) + @JoinTable( name = "map_db_topic", + joinColumns = { @JoinColumn(name="db_name") }, + inverseJoinColumns = { @JoinColumn(name="topic_name") } + ) */ + protected Set topics; + public Db() { } public Db(String name) { this.name = name; } - + @Override - public boolean equals(Object obj) { - return name.equals(((Db)obj).getName()); + public boolean equals(Object obj) { + if (obj == null) + return false; + + if (this.getClass() != obj.getClass()) + return false; + + return name.equals(((Db) obj).getName()); } @Override public int hashCode() { - return name.hashCode(); + return name.hashCode(); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index e1da4d4d..20ebf94a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -41,9 +41,9 @@ import com.fasterxml.jackson.annotation.JsonBackReference; import lombok.Getter; import lombok.Setter; - + /** - * Domain class representing topic + * Domain class representing topic * * @author Guobiao Mo * @@ -56,7 +56,7 @@ public class Topic { @Id private String name;//topic name - @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) + @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) @JoinColumn(name = "default_topic", nullable = true) private Topic defaultTopic; @@ -67,18 +67,18 @@ public class Topic { //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL) @JsonBackReference //@JsonManagedReference - @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER) + @ManyToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER) @JoinTable( name = "map_db_topic", joinColumns = { @JoinColumn(name="topic_name") }, inverseJoinColumns = { @JoinColumn(name="db_name") } ) protected Set dbs; - + /** - * indicate if we should monitor this topic + * indicate if we should monitor this topic */ private Boolean enabled; - + /** * save raw message text */ @@ -94,12 +94,12 @@ public class Topic { /** * TTL in day */ - private Integer ttl; - + private Integer ttl; + //if this flag is true, need to correlate alarm cleared message to previous alarm @Column(name = "correlate_cleared_message") private Boolean correlateClearedMessage; - + //the value in the JSON with this path will be used as DB id @Column(name = "message_id_path") private String messageIdPath; @@ -114,15 +114,15 @@ public class Topic { public boolean isDefault() { return "_DL_DEFAULT_".equals(name); } - + public boolean isEnabled() { - return is(enabled, Topic::isEnabled); + return is(enabled, Topic::isEnabled); } public boolean isCorrelateClearedMessage() { return is(correlateClearedMessage, Topic::isCorrelateClearedMessage); } - + public int getTtl() { if (ttl != null) { return ttl; @@ -130,9 +130,9 @@ public class Topic { return defaultTopic.getTtl(); } else { return 3650;//default to 10 years for safe - } + } } - + public DataFormat getDataFormat() { if (dataFormat != null) { return DataFormat.fromString(dataFormat); @@ -147,7 +147,7 @@ public class Topic { private boolean is(Boolean b, Predicate pre) { return is(b, pre, false); } - + private boolean is(Boolean b, Predicate pre, boolean defaultValue) { if (b != null) { return b; @@ -178,53 +178,59 @@ public class Topic { return containDb("MongoDB"); } - private boolean containDb(String dbName) { + private boolean containDb(String dbName) { Db db = new Db(dbName); - - if(dbs!=null && dbs.contains(db)) { + + if (dbs != null && dbs.contains(db)) { return true; } - + if (defaultTopic != null) { return defaultTopic.containDb(dbName); } else { return false; } } - + //extract DB id from JSON attributes, support multiple attributes public String getMessageId(JSONObject json) { String id = null; - - if(StringUtils.isNotBlank(messageIdPath)) { - String[] paths=messageIdPath.split(","); - - StringBuilder sb= new StringBuilder(); - for(int i=0; i0) { - sb.append('^'); + + if (StringUtils.isNotBlank(messageIdPath)) { + String[] paths = messageIdPath.split(","); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < paths.length; i++) { + if (i > 0) { + sb.append('^'); } - sb.append(json.query(paths[i]).toString()); + sb.append(json.query(paths[i]).toString()); } id = sb.toString(); } - + return id; } - + @Override public String toString() { return name; } @Override - public boolean equals(Object obj) { - return name.equals(((Topic)obj).getName()); + public boolean equals(Object obj) { + if (obj == null) + return false; + + if (this.getClass() != obj.getClass()) + return false; + + return name.equals(((Topic) obj).getName()); } @Override public int hashCode() { - return name.hashCode(); + return name.hashCode(); } - + } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index 1f637e1a..fea07187 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -29,9 +29,9 @@ import javax.annotation.PreDestroy; import org.apache.http.HttpHost; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.client.indices.CreateIndexRequest; +import org.elasticsearch.client.indices.CreateIndexResponse; +import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; @@ -100,15 +100,14 @@ public class ElasticsearchService { public void ensureTableExist(String topic) throws IOException { String topicLower = topic.toLowerCase(); - GetIndexRequest request = new GetIndexRequest(); - request.indices(topicLower); + GetIndexRequest request = new GetIndexRequest(topicLower); boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); if(!exists){ CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged()); - } + } } //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java new file mode 100644 index 00000000..2b889215 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java @@ -0,0 +1,98 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2018 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.bson.Document; + +import org.json.JSONObject; + +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.mongodb.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +/** + * Service to use MongoDB + * + * @author Guobiao Mo + * + */ +@Service +public class MongodbService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private DbService dbService; + + private MongoDatabase database; + private MongoClient mongoClient; + private Map> mongoCollectionMap = new HashMap<>(); + + @PostConstruct + private void init() { + Db mongodb = dbService.getMongoDB(); + + mongoClient = new MongoClient(mongodb.getHost(), mongodb.getPort()); + database = mongoClient.getDatabase(mongodb.getProperty1()); + } + + @PreDestroy + public void cleanUp() { + mongoClient.close(); + } + + public void saveJsons(Topic topic, List jsons) { + List documents = new ArrayList<>(jsons.size()); + for (JSONObject json : jsons) { + //convert org.json JSONObject to MongoDB Document + Document doc = Document.parse(json.toString()); + + String id = topic.getMessageId(json); //id can be null + if (id != null) { + doc.put("_id", id); + } + documents.add(doc); + } + + String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]","");//remove - _ . + MongoCollection collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k)); + collection.insertMany(documents); + + log.debug("saved text to topic = {}, topic total count = {} ", topic, collection.countDocuments()); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 84e4fb7d..d9fe12a7 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -62,6 +62,9 @@ public class StoreService { @Autowired private TopicService topicService; + @Autowired + private MongodbService mongodbService; + @Autowired private CouchbaseService couchbaseService; @@ -152,6 +155,10 @@ public class StoreService { } private void saveJsons(Topic topic, List jsons) { + if (topic.supportMongoDB()) { + mongodbService.saveJsons(topic, jsons); + } + if (topic.supportCouchbase()) { couchbaseService.saveJsons(topic, jsons); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index 4e10a365..cd5113cc 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -71,7 +71,8 @@ public class TopicService { public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException { Topic topic = getTopic(topicStr); if (topic == null) { - topic = getDefaultTopic(); + topic = new Topic(topicStr); + topic.setDefaultTopic(getDefaultTopic()); } if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) { diff --git a/components/datalake-handler/pom.xml b/components/datalake-handler/pom.xml index a526cb54..45db09db 100644 --- a/components/datalake-handler/pom.xml +++ b/components/datalake-handler/pom.xml @@ -26,12 +26,12 @@ UTF-8 1.8 - 3.8.1 + 3.10.1 2.1.0.RELEASE 3.1.2.RELEASE 2.9.6 2.0.0 - 6.7.0 + 7.0.0-rc2 -- 2.16.6