<dependencies>
-
+
<dependency>
- <groupId>org.mariadb.jdbc</groupId>
- <artifactId>mariadb-java-client</artifactId>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
</dependency>
-
+
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</dependency>
-
+
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-jpa</artifactId>
- </dependency>
-
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jpa</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-couchbase</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ </dependency>
</dependencies>
<build>
CREATE TABLE `db` (\r
`name` varchar(255) NOT NULL,\r
`host` varchar(255) DEFAULT NULL,\r
+ `port` int(11) DEFAULT NULL,\r
`login` varchar(255) DEFAULT NULL,\r
`pass` varchar(255) DEFAULT NULL,\r
`property1` varchar(255) DEFAULT NULL,\r
) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
\r
\r
-insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap');\r
+insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dl','dl1234','dl');\r
insert into db (name,host) values ('Elasticsearch','dl_es');\r
-insert into db (name,host) values ('MongoDB','dl_mongodb');\r
+insert into db (name,host,port,property1) values ('MongoDB','dl_mongodb',27017,'datalake');\r
insert into db (name,host) values ('Druid','dl_druid');\r
\r
\r
insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);\r
\r
\r
-insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_'); \r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_');\r
--- /dev/null
+package com.mongodb.internal.validator;\r
+\r
+//copy from https://github.com/mongodb/mongo-java-driver/blob/master/driver-core/src/main/com/mongodb/internal/validator/CollectibleDocumentFieldNameValidator.java\r
+//allow inserting name with dot\r
+/*\r
+* ============LICENSE_START=======================================================\r
+* ONAP : DataLake\r
+* ================================================================================\r
+* Copyright 2018 China Mobile\r
+*=================================================================================\r
+* Licensed under the Apache License, Version 2.0 (the "License");\r
+* you may not use this file except in compliance with the License.\r
+* You may obtain a copy of the License at\r
+*\r
+* http://www.apache.org/licenses/LICENSE-2.0\r
+*\r
+* Unless required by applicable law or agreed to in writing, software\r
+* distributed under the License is distributed on an "AS IS" BASIS,\r
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+* See the License for the specific language governing permissions and\r
+* limitations under the License.\r
+* ============LICENSE_END=========================================================\r
+*/\r
+import org.bson.FieldNameValidator;\r
+\r
+import java.util.Arrays;\r
+import java.util.List;\r
+\r
+/**\r
+ * A field name validator for document that are meant for storage in MongoDB collections. It ensures that no fields contain a '.',\r
+ * or start with '$' (with the exception of "$db", "$ref", and "$id", so that DBRefs are not rejected).\r
+ *\r
+ * <p>This class should not be considered a part of the public API.</p>\r
+ */\r
+public class CollectibleDocumentFieldNameValidator implements FieldNameValidator {\r
+ // Have to support DBRef fields\r
+ private static final List<String> EXCEPTIONS = Arrays.asList("$db", "$ref", "$id");\r
+\r
+ @Override\r
+ public boolean validate(final String fieldName) {\r
+ if (fieldName == null) {\r
+ throw new IllegalArgumentException("Field name can not be null");\r
+ }\r
+\r
+ /* dl change\r
+ if (fieldName.contains(".")) {\r
+ return false;\r
+ }*/\r
+\r
+ if (fieldName.startsWith("$") && !EXCEPTIONS.contains(fieldName)) {\r
+ return false;\r
+ }\r
+ return true;\r
+ }\r
+\r
+ @Override\r
+ public FieldNameValidator getValidatorForField(final String fieldName) {\r
+ return this;\r
+ }\r
+}\r
@GetMapping("/dmaap/")
@ResponseBody
- @ApiOperation(value="List all topics in DMaaP.")
+ @ApiOperation(value="List all topic names in DMaaP.")
public List<String> listDmaapTopics() throws IOException {
return dmaapService.getTopics();
}
@GetMapping("/")
@ResponseBody
- @ApiOperation(value="List all topics' details.")
+ @ApiOperation(value="List all topics' settings.")
public Iterable<Topic> list() throws IOException {
Iterable<Topic> ret = topicRepository.findAll();
return ret;
@GetMapping("/{topicName}")
@ResponseBody
- @ApiOperation(value="Get a topic's details.")
+ @ApiOperation(value="Get a topic's settings.")
public Topic getTopic(@PathVariable("topicName") String topicName) throws IOException {
Topic topic = topicService.getTopic(topicName);
return topic;
import lombok.Getter;
import lombok.Setter;
-
+
/**
* Domain class representing bid data storage
*
private String name;
private String host;
+ private Integer port;
private String login;
private String pass;
private String property1;
private String property2;
- private String property3;
+ private String property3;
@JsonBackReference
- @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL)
+ @ManyToMany(mappedBy = "dbs", cascade = CascadeType.ALL)
/*
- @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER)
- @JoinTable( name = "map_db_topic",
- joinColumns = { @JoinColumn(name="db_name") },
- inverseJoinColumns = { @JoinColumn(name="topic_name") }
- ) */
- protected Set<Topic> topics;
-
+ @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER)
+ @JoinTable( name = "map_db_topic",
+ joinColumns = { @JoinColumn(name="db_name") },
+ inverseJoinColumns = { @JoinColumn(name="topic_name") }
+ ) */
+ protected Set<Topic> topics;
+
public Db() {
}
public Db(String name) {
this.name = name;
}
-
+
@Override
- public boolean equals(Object obj) {
- return name.equals(((Db)obj).getName());
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (this.getClass() != obj.getClass())
+ return false;
+
+ return name.equals(((Db) obj).getName());
}
@Override
public int hashCode() {
- return name.hashCode();
+ return name.hashCode();
}
}
import lombok.Getter;
import lombok.Setter;
-
+
/**
- * Domain class representing topic
+ * Domain class representing topic
*
* @author Guobiao Mo
*
@Id
private String name;//topic name
- @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
+ @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
@JoinColumn(name = "default_topic", nullable = true)
private Topic defaultTopic;
//@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
@JsonBackReference
//@JsonManagedReference
- @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER)
+ @ManyToMany(cascade = CascadeType.ALL, fetch = FetchType.EAGER)
@JoinTable( name = "map_db_topic",
joinColumns = { @JoinColumn(name="topic_name") },
inverseJoinColumns = { @JoinColumn(name="db_name") }
)
protected Set<Db> dbs;
-
+
/**
- * indicate if we should monitor this topic
+ * indicate if we should monitor this topic
*/
private Boolean enabled;
-
+
/**
* save raw message text
*/
/**
* TTL in day
*/
- private Integer ttl;
-
+ private Integer ttl;
+
//if this flag is true, need to correlate alarm cleared message to previous alarm
@Column(name = "correlate_cleared_message")
private Boolean correlateClearedMessage;
-
+
//the value in the JSON with this path will be used as DB id
@Column(name = "message_id_path")
private String messageIdPath;
public boolean isDefault() {
return "_DL_DEFAULT_".equals(name);
}
-
+
public boolean isEnabled() {
- return is(enabled, Topic::isEnabled);
+ return is(enabled, Topic::isEnabled);
}
public boolean isCorrelateClearedMessage() {
return is(correlateClearedMessage, Topic::isCorrelateClearedMessage);
}
-
+
public int getTtl() {
if (ttl != null) {
return ttl;
return defaultTopic.getTtl();
} else {
return 3650;//default to 10 years for safe
- }
+ }
}
-
+
public DataFormat getDataFormat() {
if (dataFormat != null) {
return DataFormat.fromString(dataFormat);
private boolean is(Boolean b, Predicate<Topic> pre) {
return is(b, pre, false);
}
-
+
private boolean is(Boolean b, Predicate<Topic> pre, boolean defaultValue) {
if (b != null) {
return b;
return containDb("MongoDB");
}
- private boolean containDb(String dbName) {
+ private boolean containDb(String dbName) {
Db db = new Db(dbName);
-
- if(dbs!=null && dbs.contains(db)) {
+
+ if (dbs != null && dbs.contains(db)) {
return true;
}
-
+
if (defaultTopic != null) {
return defaultTopic.containDb(dbName);
} else {
return false;
}
}
-
+
//extract DB id from JSON attributes, support multiple attributes
public String getMessageId(JSONObject json) {
String id = null;
-
- if(StringUtils.isNotBlank(messageIdPath)) {
- String[] paths=messageIdPath.split(",");
-
- StringBuilder sb= new StringBuilder();
- for(int i=0; i<paths.length; i++) {
- if(i>0) {
- sb.append('^');
+
+ if (StringUtils.isNotBlank(messageIdPath)) {
+ String[] paths = messageIdPath.split(",");
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < paths.length; i++) {
+ if (i > 0) {
+ sb.append('^');
}
- sb.append(json.query(paths[i]).toString());
+ sb.append(json.query(paths[i]).toString());
}
id = sb.toString();
}
-
+
return id;
}
-
+
@Override
public String toString() {
return name;
}
@Override
- public boolean equals(Object obj) {
- return name.equals(((Topic)obj).getName());
+ public boolean equals(Object obj) {
+ if (obj == null)
+ return false;
+
+ if (this.getClass() != obj.getClass())
+ return false;
+
+ return name.equals(((Topic) obj).getName());
}
@Override
public int hashCode() {
- return name.hashCode();
+ return name.hashCode();
}
-
+
}
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.CreateIndexResponse;
+import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
public void ensureTableExist(String topic) throws IOException {
String topicLower = topic.toLowerCase();
- GetIndexRequest request = new GetIndexRequest();
- request.indices(topicLower);
+ GetIndexRequest request = new GetIndexRequest(topicLower);
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
if(!exists){
CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
- }
+ }
}
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2018 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.bson.Document;
+
+import org.json.JSONObject;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+/**
+ * Service to use MongoDB
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class MongodbService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private DbService dbService;
+
+ private MongoDatabase database;
+ private MongoClient mongoClient;
+ private Map<String, MongoCollection<Document>> mongoCollectionMap = new HashMap<>();
+
+ @PostConstruct
+ private void init() {
+ Db mongodb = dbService.getMongoDB();
+
+ mongoClient = new MongoClient(mongodb.getHost(), mongodb.getPort());
+ database = mongoClient.getDatabase(mongodb.getProperty1());
+ }
+
+ @PreDestroy
+ public void cleanUp() {
+ mongoClient.close();
+ }
+
+ public void saveJsons(Topic topic, List<JSONObject> jsons) {
+ List<Document> documents = new ArrayList<>(jsons.size());
+ for (JSONObject json : jsons) {
+ //convert org.json JSONObject to MongoDB Document
+ Document doc = Document.parse(json.toString());
+
+ String id = topic.getMessageId(json); //id can be null
+ if (id != null) {
+ doc.put("_id", id);
+ }
+ documents.add(doc);
+ }
+
+ String collectionName = topic.getName().replaceAll("[^a-zA-Z0-9]","");//remove - _ .
+ MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k));
+ collection.insertMany(documents);
+
+ log.debug("saved text to topic = {}, topic total count = {} ", topic, collection.countDocuments());
+ }
+
+}
@Autowired
private TopicService topicService;
+ @Autowired
+ private MongodbService mongodbService;
+
@Autowired
private CouchbaseService couchbaseService;
}
private void saveJsons(Topic topic, List<JSONObject> jsons) {
+ if (topic.supportMongoDB()) {
+ mongodbService.saveJsons(topic, jsons);
+ }
+
if (topic.supportCouchbase()) {
couchbaseService.saveJsons(topic, jsons);
}
public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
Topic topic = getTopic(topicStr);
if (topic == null) {
- topic = getDefaultTopic();
+ topic = new Topic(topicStr);
+ topic.setDefaultTopic(getDefaultTopic());
}
if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) {
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
- <mongojava.version>3.8.1</mongojava.version>
+ <mongojava.version>3.10.1</mongojava.version>
<springboot.version>2.1.0.RELEASE</springboot.version>
<springcouchbase.version>3.1.2.RELEASE</springcouchbase.version>
<jackson.version>2.9.6</jackson.version>
<kafka.version>2.0.0</kafka.version>
- <elasticsearchjava.version>6.7.0</elasticsearchjava.version>
+ <elasticsearchjava.version>7.0.0-rc2</elasticsearchjava.version>
</properties>