<version>1.0.0-SNAPSHOT</version>
</parent>
- <groupId>org.onap.datalake</groupId>
+ <groupId>org.onap.dcaegen2.services.components.datalake-handler</groupId>
<artifactId>feeder</artifactId>
<packaging>jar</packaging>
<name>DataLake Feeder</name>
<dependencies>
+
+ <dependency>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
-
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jpa</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-couchbase</artifactId>
+++ /dev/null
-version: '2'
-services:
-
- datalake:
- image: moguobiao/datalake-storage
- container_name: datalake-storage
- environment:
- - no-needed-dmaapHost=10.0.2.15:3904
- ports:
- - "1680:1680"
--- /dev/null
+create database datalake;\r
+use datalake;\r
+\r
+CREATE TABLE `topic` (\r
+ `name` varchar(255) NOT NULL,\r
+ `correlate_cleared_message` bit(1) DEFAULT NULL,\r
+ `enabled` bit(1) DEFAULT NULL,\r
+ `login` varchar(255) DEFAULT NULL,\r
+ `message_id_path` varchar(255) DEFAULT NULL,\r
+ `pass` varchar(255) DEFAULT NULL,\r
+ `save_raw` bit(1) DEFAULT NULL,\r
+ `ttl` int(11) DEFAULT NULL,\r
+ `data_format` varchar(255) DEFAULT NULL,\r
+ `default_topic` varchar(255) DEFAULT NULL,\r
+ PRIMARY KEY (`name`),\r
+ KEY `FK_default_topic` (`default_topic`),\r
+ CONSTRAINT `FK_default_topic` FOREIGN KEY (`default_topic`) REFERENCES `topic` (`name`)\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
+\r
+\r
+CREATE TABLE `db` (\r
+ `name` varchar(255) NOT NULL,\r
+ `host` varchar(255) DEFAULT NULL,\r
+ `login` varchar(255) DEFAULT NULL,\r
+ `pass` varchar(255) DEFAULT NULL,\r
+ `property1` varchar(255) DEFAULT NULL,\r
+ `property2` varchar(255) DEFAULT NULL,\r
+ `property3` varchar(255) DEFAULT NULL,\r
+ PRIMARY KEY (`name`)\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
+\r
+\r
+CREATE TABLE `map_db_topic` (\r
+ `db_name` varchar(255) NOT NULL,\r
+ `topic_name` varchar(255) NOT NULL,\r
+ PRIMARY KEY (`db_name`,`topic_name`),\r
+ KEY `FK_topic_name` (`topic_name`),\r
+ CONSTRAINT `FK_topic_name` FOREIGN KEY (`topic_name`) REFERENCES `topic` (`name`),\r
+ CONSTRAINT `FK_db_name` FOREIGN KEY (`db_name`) REFERENCES `db` (`name`)\r
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
+\r
+\r
+insert into db (name,host,login,pass,property1) values ('Couchbase','dl_couchbase','dmaap','dmaap1234','dmaap');\r
+insert into db (name,host) values ('Elasticsearch','dl_es');\r
+insert into db (name,host) values ('MongoDB','dl_mongodb');\r
+insert into db (name,host) values ('Druid','dl_druid');\r
+\r
+\r
+-- in production, default enabled should be off\r
+insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');\r
+insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);\r
+\r
+\r
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAULT_'); \r
package org.onap.datalake.feeder.config;
-import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.couchbase.CouchbaseConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.FilterType;
-import org.springframework.context.annotation.ComponentScan;
import lombok.Getter;
import lombok.Setter;
@Setter
@SpringBootConfiguration
@ConfigurationProperties
-//@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = CouchbaseConfiguration.class))
-//https://stackoverflow.com/questions/29344313/prevent-application-commandlinerunner-classes-from-executing-during-junit-test
@EnableAutoConfiguration
-//@Profile("test")
public class ApplicationConfiguration {
- private String couchbaseHost;
- private String couchbaseUser;
- private String couchbasePass;
- private String couchbaseBucket;
-
- // private int mongodbPort;
- // private String mongodbDatabase;
-
private String dmaapZookeeperHostPort;
private String dmaapKafkaHostPort;
private String dmaapKafkaGroup;
private long dmaapKafkaTimeout;
-// private boolean dmaapMonitorAllTopics;
private int dmaapCheckNewTopicIntervalInSec;
- //private String dmaapHostPort;
- //private Set<String> dmaapExcludeTopics;
- //private Set<String> dmaapIncludeTopics;
private int kafkaConsumerCount;
private boolean async;
- private String elasticsearchHost;
}
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.controller;
+
+import java.io.IOException;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.service.DbService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * This controller manages the big data storage settings. All the settings are saved in database.
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+@RestController
+@RequestMapping(value = "/dbs", produces = { MediaType.APPLICATION_JSON_VALUE })
+public class DbController {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private DbRepository dbRepository;
+
+ @Autowired
+ private DbService dbService;
+
+ //list all dbs
+ @GetMapping("/")
+ @ResponseBody
+ public Iterable<Db> list() throws IOException {
+ Iterable<Db> ret = dbRepository.findAll();
+ return ret;
+ }
+
+ //Read a db
+ //the topics are missing in the return, since in we use @JsonBackReference on Db's topics
+ //need to the the following method to retrieve the topic list
+ @GetMapping("/{name}")
+ @ResponseBody
+ public Db getDb(@PathVariable("name") String dbName) throws IOException {
+ Db db = dbService.getDb(dbName);
+ return db;
+ }
+
+ //Read topics in a DB
+ @GetMapping("/{name}/topics")
+ @ResponseBody
+ public Set<Topic> getDbTopics(@PathVariable("name") String dbName) throws IOException {
+ Db db = dbService.getDb(dbName);
+ Set<Topic> topics = db.getTopics();
+ return topics;
+ }
+
+ //Update Db
+ @PutMapping("/")
+ @ResponseBody
+ public Db updateDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing DB: "+result.toString());
+ return null;
+ }
+
+ Db oldDb = getDb(db.getName());
+ if (oldDb == null) {
+ sendError(response, 404, "Db not found: "+db.getName());
+ return null;
+ } else {
+ dbRepository.save(db);
+ return db;
+ }
+ }
+
+ //create a new Db
+ @PostMapping("/")
+ @ResponseBody
+ public Db createDb(@RequestBody Db db, BindingResult result, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing DB: "+result.toString());
+ return null;
+ }
+
+ Db oldDb = getDb(db.getName());
+ if (oldDb != null) {
+ sendError(response, 400, "Db already exists: "+db.getName());
+ return null;
+ } else {
+ dbRepository.save(db);
+ return db;
+ }
+ }
+
+ private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+ log.info(msg);
+ response.sendError(sc, msg);
+ }
+}
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
*/
@RestController
-@RequestMapping(value = "/pull", produces = { MediaType.TEXT_PLAIN_VALUE })
-public class PullController {
+@RequestMapping(value = "/feeder", produces = { MediaType.TEXT_PLAIN_VALUE })
+public class FeederController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
* @return message that application is started
* @throws IOException
*/
- @RequestMapping("/start")
+ @GetMapping("/start")
public String start() throws IOException {
log.info("DataLake feeder starting to pull data from DMaaP...");
pullService.start();
/**
* @return message that application stop process is triggered
*/
- @RequestMapping("/stop")
+ @GetMapping("/stop")
public String stop() {
pullService.shutdown();
log.info("DataLake feeder is stopped.");
/**
* @return feeder status
*/
- @RequestMapping("/status")
+ @GetMapping("/status")
public String status() {
- String status = "to be impletemented";
+ String status = "Feeder is running: "+pullService.isRunning();
log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
return status;
}
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
+import java.util.Set;
+import javax.servlet.http.HttpServletResponse;
+
+import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.TopicRepository;
+import org.onap.datalake.feeder.service.DbService;
import org.onap.datalake.feeder.service.DmaapService;
+import org.onap.datalake.feeder.service.TopicService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
/**
- * This controller manages all the topic settings. Topic "_DL_DEFAULT_" acts as
- * the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is
- * used for that topic. All the settings are saved in Couchbase. topic
- * "_DL_DEFAULT_" is populated at setup by a DB script.
+ * This controller manages topic settings.
+ *
+ * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is used for that topic.
+ * All the settings are saved in database.
+ * topic "_DL_DEFAULT_" is populated at setup by a DB script.
*
* @author Guobiao Mo
*
*/
@RestController
-@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })
+@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })//, consumes= {MediaType.APPLICATION_JSON_UTF8_VALUE})
public class TopicController {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
private TopicRepository topicRepository;
+
+ @Autowired
+ private TopicService topicService;
+ @Autowired
+ private DbService dbService;
+
//list all topics in DMaaP
@GetMapping("/dmaap/")
@ResponseBody
}
//Read a topic
- @GetMapping("/{name}")
+ @GetMapping("/{topicname}")
@ResponseBody
- public Topic getTopic(@PathVariable("name") String topicName) throws IOException {
- //Topic topic = topicRepository.findFirstById(topicName);
- Optional<Topic> topic = topicRepository.findById(topicName);
- if (topic.isPresent()) {
- return topic.get();
- } else {
- return null;
- }
+ public Topic getTopic(@PathVariable("topicname") String topicName) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ return topic;
+ }
+
+ //Read DBs in a topic
+ @GetMapping("/{topicname}/dbs")
+ @ResponseBody
+ public Set<Db> getTopicDbs(@PathVariable("topicname") String topicName) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ Set<Db> dbs = topic.getDbs();
+ return dbs;
}
//Update Topic
+ //This is not a partial update: old topic is wiped out, and new topic is created base on the input json.
+ //One exception is that old DBs are kept
@PutMapping("/")
@ResponseBody
- public Topic updateTopic(Topic topic, BindingResult result) throws IOException {
+ public Topic updateTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
if (result.hasErrors()) {
- log.error(result.toString());
-
- return null;//TODO return binding error
+ sendError(response, 400, "Error parsing Topic: "+result.toString());
+ return null;
}
- Topic oldTopic = getTopic(topic.getId());
+ Topic oldTopic = getTopic(topic.getName());
if (oldTopic == null) {
- return null;//TODO return not found error
+ sendError(response, 404, "Topic not found "+topic.getName());
+ return null;
} else {
+ if(!topic.isDefault()) {
+ Topic defaultTopic = topicService.getDefaultTopic();
+ topic.setDefaultTopic(defaultTopic);
+ }
+
+ topic.setDbs(oldTopic.getDbs());
topicRepository.save(topic);
return topic;
}
//create a new Topic
@PostMapping("/")
@ResponseBody
- public Topic createTopic(Topic topic, BindingResult result) throws IOException {
-
+ public Topic createTopic(@RequestBody Topic topic, BindingResult result, HttpServletResponse response) throws IOException {
+
if (result.hasErrors()) {
- log.error(result.toString());
+ sendError(response, 400, "Error parsing Topic: "+result.toString());
return null;
}
- Topic oldTopic = getTopic(topic.getId());
+ Topic oldTopic = getTopic(topic.getName());
if (oldTopic != null) {
- return null;//TODO return 'already exists' error
+ sendError(response, 400, "Topic already exists "+topic.getName());
+ return null;
} else {
+ if(!topic.isDefault()) {
+ Topic defaultTopic = topicService.getDefaultTopic();
+ topic.setDefaultTopic(defaultTopic);
+ }
+
topicRepository.save(topic);
return topic;
}
}
+ //delete a db from the topic
+ @DeleteMapping("/{topicname}/db/{dbname}")
+ @ResponseBody
+ public Set<Db> deleteDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ Set<Db> dbs = topic.getDbs();
+ dbs.remove(new Db(dbName));
+
+ topicRepository.save(topic);
+ return topic.getDbs();
+ }
+
+ //add a db to the topic
+ @PutMapping("/{topicname}/db/{dbname}")
+ @ResponseBody
+ public Set<Db> addDb(@PathVariable("topicname") String topicName, @PathVariable("dbname") String dbName, HttpServletResponse response) throws IOException {
+ Topic topic = topicService.getTopic(topicName);
+ Set<Db> dbs = topic.getDbs();
+
+ Db db = dbService.getDb(dbName);
+ dbs.add(db);
+
+ topicRepository.save(topic);
+ return topic.getDbs();
+ }
+
+ private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+ log.info(msg);
+ response.sendError(sc, msg);
+ }
}
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import java.util.Set;
+
+import javax.persistence.CascadeType;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.ManyToMany;
+import javax.persistence.Table;
+
+import com.fasterxml.jackson.annotation.JsonBackReference;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Domain class representing bid data storage
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Setter
+@Getter
+@Entity
+@Table(name = "db")
+public class Db {
+ @Id
+ private String name;
+
+ private String host;
+ private String login;
+ private String pass;
+
+ private String property1;
+ private String property2;
+ private String property3;
+
+ @JsonBackReference
+ @ManyToMany(mappedBy = "dbs", cascade=CascadeType.ALL)
+ /*
+ @ManyToMany(cascade=CascadeType.ALL)//, fetch=FetchType.EAGER)
+ @JoinTable( name = "map_db_topic",
+ joinColumns = { @JoinColumn(name="db_name") },
+ inverseJoinColumns = { @JoinColumn(name="topic_name") }
+ ) */
+ protected Set<Topic> topics;
+
+ public Db() {
+ }
+
+ public Db(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return name.equals(((Db)obj).getName());
+ }
+
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+}
*/
package org.onap.datalake.feeder.domain;
+import java.util.Set;
import java.util.function.Predicate;
-import javax.validation.constraints.NotNull;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.JoinTable;
+import javax.persistence.ManyToMany;
+import javax.persistence.ManyToOne;
+import javax.persistence.Table;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.onap.datalake.feeder.enumeration.DataFormat;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.annotation.Transient;
-import org.springframework.data.couchbase.core.mapping.Document;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+
+import lombok.Getter;
import lombok.Setter;
/**
- * Domain class representing topic table in Couchbase
+ * Domain class representing topic
*
* @author Guobiao Mo
*
*/
-@Document
@Setter
+@Getter
+@Entity
+@Table(name = "topic")
public class Topic {
- @NotNull
@Id
- private String id;//topic name
+ private String name;//topic name
- @Transient
+ @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
+ @JoinColumn(name = "default_topic", nullable = true)
private Topic defaultTopic;
//for protected Kafka topics
private String login;
private String pass;
+ //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
+ @JsonBackReference
+ //@JsonManagedReference
+ @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER)
+ @JoinTable( name = "map_db_topic",
+ joinColumns = { @JoinColumn(name="topic_name") },
+ inverseJoinColumns = { @JoinColumn(name="db_name") }
+ )
+ protected Set<Db> dbs;
+
/**
* indicate if we should monitor this topic
*/
/**
* save raw message text
*/
+ @Column(name = "save_raw")
private Boolean saveRaw;
/**
- * true: save it to Elasticsearch false: don't save null: use default
- */
- private Boolean supportElasticsearch;
- private Boolean supportCouchbase;
- private Boolean supportDruid;
-
- /**
- * need to explicitly tell feeder the data format of the message
+ * need to explicitly tell feeder the data format of the message.
* support JSON, XML, YAML, TEXT
*/
- private DataFormat dataFormat;
+ private String dataFormat;
/**
* TTL in day
private Integer ttl;
//if this flag is true, need to correlate alarm cleared message to previous alarm
+ @Column(name = "correlate_cleared_message")
private Boolean correlateClearedMessage;
//the value in the JSON with this path will be used as DB id
+ @Column(name = "message_id_path")
private String messageIdPath;
public Topic() {
}
- public Topic(String id) {
- this.id = id;
+ public Topic(String name) {
+ this.name = name;
}
- public String getId() {
- return id;
+ public boolean isDefault() {
+ return "_DL_DEFAULT_".equals(name);
}
- public void setDefaultTopic(Topic defaultTopic) {
- this.defaultTopic = defaultTopic;
- }
-
public boolean isEnabled() {
return is(enabled, Topic::isEnabled);
}
public DataFormat getDataFormat() {
if (dataFormat != null) {
- return dataFormat;
+ return DataFormat.fromString(dataFormat);
} else if (defaultTopic != null) {
return defaultTopic.getDataFormat();
} else {
return is(saveRaw, Topic::isSaveRaw);
}
- public boolean isSupportElasticsearch() {
- return is(supportElasticsearch, Topic::isSupportElasticsearch);
+ public boolean supportElasticsearch() {
+ return containDb("Elasticsearch");//TODO string hard codes
+ }
+
+ public boolean supportCouchbase() {
+ return containDb("Couchbase");
}
- public boolean isSupportCouchbase() {
- return is(supportCouchbase, Topic::isSupportCouchbase);
+ public boolean supportDruid() {
+ return containDb("Druid");
}
- public boolean isSupportDruid() {
- return is(supportDruid, Topic::isSupportDruid);
+ public boolean supportMongoDB() {
+ return containDb("MongoDB");
}
- //extract DB id from a JSON attribute, TODO support multiple attributes
+ private boolean containDb(String dbName) {
+ Db db = new Db(dbName);
+
+ if(dbs!=null && dbs.contains(db)) {
+ return true;
+ }
+
+ if (defaultTopic != null) {
+ return defaultTopic.containDb(dbName);
+ } else {
+ return false;
+ }
+ }
+
+ //extract DB id from JSON attributes, support multiple attributes
public String getMessageId(JSONObject json) {
String id = null;
if(StringUtils.isNotBlank(messageIdPath)) {
- id = json.query(messageIdPath).toString();
+ String[] paths=messageIdPath.split(",");
+
+ StringBuilder sb= new StringBuilder();
+ for(int i=0; i<paths.length; i++) {
+ if(i>0) {
+ sb.append('^');
+ }
+ sb.append(json.query(paths[i]).toString());
+ }
+ id = sb.toString();
}
return id;
@Override
public String toString() {
- return id;
+ return name;
}
- /**
- * @return the messageIdPath
- */
- public String getMessageIdPath() {
- return messageIdPath;
+ @Override
+ public boolean equals(Object obj) {
+ return name.equals(((Topic)obj).getName());
}
- /**
- * @param messageIdPath the messageIdPath to set
- */
- public void setMessageIdPath(String messageIdPath) {
- this.messageIdPath = messageIdPath;
+ @Override
+ public int hashCode() {
+ return name.hashCode();
}
+
}
*/\r
package org.onap.datalake.feeder.repository;\r
\r
+import org.onap.datalake.feeder.domain.Db;\r
+\r
+import org.springframework.data.repository.CrudRepository;\r
+\r
/**\r
+ * \r
+ * Db Repository \r
+ * \r
* @author Guobiao Mo\r
*\r
- */\r
-public interface TopicRepositoryCustom {\r
- long updateTopic(String topic, Boolean state);\r
+ */ \r
+\r
+public interface DbRepository extends CrudRepository<Db, String> {\r
+\r
}\r
package org.onap.datalake.feeder.repository;\r
\r
import org.onap.datalake.feeder.domain.Topic;\r
-import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;\r
-import org.springframework.data.couchbase.core.query.Query;\r
-import org.springframework.data.couchbase.core.query.ViewIndexed;\r
-import org.springframework.data.couchbase.repository.CouchbasePagingAndSortingRepository;\r
-import org.springframework.data.domain.Page;\r
-import org.springframework.data.domain.Pageable;\r
- \r
\r
-import java.util.List;\r
+import org.springframework.data.repository.CrudRepository;\r
\r
/**\r
* \r
- * Topic Repository interface, implementation is taken care by Spring framework.\r
- * Customization is done through TopicRepositoryCustom and its implementation TopicRepositoryImpl. \r
+ * Topic Repository \r
* \r
* @author Guobiao Mo\r
*\r
- */\r
-@ViewIndexed(designDoc = "topic", viewName = "all")\r
-public interface TopicRepository extends CouchbasePagingAndSortingRepository<Topic, String>, TopicRepositoryCustom {\r
-/*\r
- Topic findFirstById(String topic);\r
-\r
- Topic findByIdAndState(String topic, boolean state);\r
-\r
- //Supports native JSON query string\r
- @Query("{topic:'?0'}")\r
- Topic findTopicById(String topic);\r
-\r
- @Query("{topic: { $regex: ?0 } })")\r
- List<Topic> findTopicByRegExId(String topic);\r
-\r
-\r
- //Page<Topic> findByCompanyIdAndNameLikeOrderByName(String companyId, String name, Pageable pageable);\r
-\r
- @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} and companyId = $1 and $2 within #{#n1ql.bucket}")\r
- Topic findByCompanyAndAreaId(String companyId, String areaId);\r
+ */ \r
\r
- @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} AND ANY phone IN phoneNumbers SATISFIES phone = $1 END")\r
- List<Topic> findByPhoneNumber(String telephoneNumber);\r
+public interface TopicRepository extends CrudRepository<Topic, String> {\r
\r
- @Query("SELECT COUNT(*) AS count FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} and companyId = $1")\r
- Long countBuildings(String companyId);\r
- */\r
}\r
+++ /dev/null
-/*\r
-* ============LICENSE_START=======================================================\r
-* ONAP : DataLake\r
-* ================================================================================\r
-* Copyright 2019 China Mobile\r
-*=================================================================================\r
-* Licensed under the Apache License, Version 2.0 (the "License");\r
-* you may not use this file except in compliance with the License.\r
-* You may obtain a copy of the License at\r
-*\r
-* http://www.apache.org/licenses/LICENSE-2.0\r
-*\r
-* Unless required by applicable law or agreed to in writing, software\r
-* distributed under the License is distributed on an "AS IS" BASIS,\r
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
-* See the License for the specific language governing permissions and\r
-* limitations under the License.\r
-* ============LICENSE_END=========================================================\r
-*/\r
-package org.onap.datalake.feeder.repository;\r
-\r
-import org.onap.datalake.feeder.domain.Topic;\r
-import org.springframework.beans.factory.annotation.Autowired;\r
-import org.springframework.data.couchbase.core.CouchbaseTemplate;\r
-/*\r
-import org.springframework.data.mongodb.MongoDbFactory;\r
-import org.springframework.data.mongodb.core.MongoTemplate;\r
-import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;\r
-import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;\r
-import org.springframework.data.mongodb.core.convert.MappingMongoConverter;\r
-import org.springframework.data.mongodb.core.mapping.MongoMappingContext;\r
-import org.springframework.data.mongodb.core.query.Criteria;\r
-import org.springframework.data.mongodb.core.query.Query;\r
-import org.springframework.data.mongodb.core.query.Update; \r
-\r
-import com.mongodb.WriteResult;\r
-import com.mongodb.client.result.UpdateResult;\r
-*/\r
-import java.util.List;\r
-\r
-/**\r
- * @author Guobiao Mo\r
- *\r
- */\r
-public class TopicRepositoryImpl implements TopicRepositoryCustom {\r
-\r
- @Autowired\r
- CouchbaseTemplate template;\r
- \r
- @Override\r
- public long updateTopic(String topic, Boolean state) {\r
-/*\r
- Query query = new Query(Criteria.where("id").is(topic));\r
- Update update = new Update();\r
- update.set("state", state);\r
-\r
- UpdateResult result = mongoTemplate.updateFirst(query, update, Topic.class);\r
-\r
- if(result!=null)\r
- return result.getModifiedCount();\r
- else\r
- */ return 0L;\r
- \r
- \r
- \r
- }\r
-}\r
import javax.annotation.PreDestroy;
import org.json.JSONObject;
-import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
- private ApplicationConfiguration config;
-
+ private DbService dbService;
+
Bucket bucket;
@PostConstruct
private void init() {
// Initialize Couchbase Connection
- Cluster cluster = CouchbaseCluster.create(config.getCouchbaseHost());
- cluster.authenticate(config.getCouchbaseUser(), config.getCouchbasePass());
- bucket = cluster.openBucket(config.getCouchbaseBucket());
+
+ Db couchbase = dbService.getCouchbase();
+ Cluster cluster = CouchbaseCluster.create(couchbase.getHost());
+ cluster.authenticate(couchbase.getLogin(), couchbase.getPass());
+ bucket = cluster.openBucket(couchbase.getProperty1());
- log.info("Connect to Couchbase " + config.getCouchbaseHost());
+ log.info("Connect to Couchbase " + couchbase.getHost());
// Create a N1QL Primary Index (but ignore if it exists)
bucket.bucketManager().createN1qlPrimaryIndex(true, false);
//setup TTL
int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
- String id = getId(topic.getId());
+ String id = getId(topic, json);
JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
documents.add(doc);
}
saveDocuments(documents);
}
-
- private String getId(String topicStr) {
+ public String getId(Topic topic, JSONObject json) {
+ //if this topic requires extract id from JSON
+ String id = topic.getMessageId(json);
+ if(id != null) {
+ return id;
+ }
+
+ String topicStr= topic.getName();
//String id = topicStr+":"+timestamp+":"+UUID.randomUUID();
//https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2
// increment by 1, initialize at 0 if counter doc not found
//TODO how slow is this compared with above UUID approach?
JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345
- String id = topicStr +":"+ nextIdNumber.content();
+ id = topicStr +":"+ nextIdNumber.content();
return id;
}
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.util.Optional;
+
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.repository.DbRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service for Dbs
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class DbService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private DbRepository dbRepository;
+
+ public Db getDb(String name) {
+ Optional<Db> ret = dbRepository.findById(name);
+ return ret.isPresent() ? ret.get() : null;
+ }
+
+ public Db getCouchbase() {
+ return getDb("Couchbase");
+ }
+
+ public Db getElasticsearch() {
+ return getDb("Elasticsearch");
+ }
+
+ public Db getMongoDB() {
+ return getDb("MongoDB");
+ }
+
+ public Db getDruid() {
+ return getDb("Druid");
+ }
+
+}
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Db;
import org.onap.datalake.feeder.domain.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Autowired
private ApplicationConfiguration config;
+
+ @Autowired
+ private DbService dbService;
private RestHighLevelClient client;
ActionListener<BulkResponse> listener;
@PostConstruct
private void init() {
- String elasticsearchHost = config.getElasticsearchHost();
-
+ Db elasticsearch = dbService.getElasticsearch();
+ String elasticsearchHost = elasticsearch.getHost();
+
// Initialize the Connection
client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
public void ensureTableExist(String topic) throws IOException {
String topicLower = topic.toLowerCase();
-
- CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
- try {
+
+ GetIndexRequest request = new GetIndexRequest();
+ request.indices(topicLower);
+
+ boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
+ if(!exists){
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
- }catch(ElasticsearchStatusException e) {
- log.info("{} create ES topic status: {}", topic, e.getDetailedMessage());
- }
+ }
}
//TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
continue;
}
}
- request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON));
+
+ String id = topic.getMessageId(json); //id can be null
+
+ request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
}
if(config.isAsync()) {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
try {
client.bulk(request, RequestOptions.DEFAULT);
} catch (IOException e) {
- log.error( topic.getId() , e);
+ log.error( topic.getName() , e);
}
}
}
private boolean correlateClearedMessage(JSONObject json) {
- boolean found = false;
-
+ boolean found = true;
+
/*TODO
* 1. check if this is a alarm cleared message
* 2. search previous alarm message
* 3. update previous message, if success, set found=true
*/
+ //for Sonar test, remove the following
+ if(json.isNull("kkkkk")) {
+ found = false;
+ }
return found;
}
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import javax.annotation.PostConstruct;
-
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Autowired
private ApplicationConfiguration config;
- @PostConstruct
- private void init() {
+ /**
+ * @return the isRunning
+ */
+ public boolean isRunning() {
+ return isRunning;
}
-
+
/**
* start pulling.
*
executorService.awaitTermination(10L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("executor.awaitTermination", e);
+ Thread.currentThread().interrupt();
}
isRunning = false;
}
private void saveJsons(Topic topic, List<JSONObject> jsons) {
- if (topic.isSupportCouchbase()) {
+ if (topic.supportCouchbase()) {
couchbaseService.saveJsons(topic, jsons);
}
- if (topic.isSupportElasticsearch()) {
+ if (topic.supportElasticsearch()) {
elasticsearchService.saveJsons(topic, jsons);
}
}
import org.springframework.stereotype.Service;
/**
- * Service for topics topic setting is stored in Couchbase, bucket 'dl', see
- * application.properties for Spring setup
+ * Service for topics
*
* @author Guobiao Mo
*
return null;
}
+ //TODO caller should not modify the returned topic, maybe return a clone
public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
Topic topic = getTopic(topicStr);
if (topic == null) {
- topic = new Topic(topicStr);
+ topic = getDefaultTopic();
}
-
- topic.setDefaultTopic(getDefaultTopic());
- if(ensureTableExist && topic.isEnabled() && topic.isSupportElasticsearch()) {
+ if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) {
elasticsearchService.ensureTableExist(topicStr);
}
return topic;
server.port = 1680
+# Spring connection to MariaDB for ORM
+#spring.jpa.hibernate.ddl-auto=update
+spring.jpa.hibernate.ddl-auto=none
+spring.jpa.show-sql=false
+
+#spring.datasource.driver-class-name=com.mysql.jdbc.Driver
+spring.datasource.url=jdbc:mariadb://dl_mariadb:3306/datalake?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8
+spring.datasource.username=nook
+spring.datasource.password=nook123
+
#For Beijing lab
#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80
logging.level.com.att.nsa.apiClient.http=ERROR
logging.level.org.onap.datalake=DEBUG
-# Spring connection to Couchbase DB for ORM
-
-#not work when run in osboxes, but works in Eclipse
-spring.couchbase.bootstrap-hosts=dl_couchbase
-#spring.couchbase.bootstrap-hosts=172.30.1.74
-
-#a user with name as bucket.name must be created, with the pass as bucket.password
-# https://stackoverflow.com/questions/51496589/bucket-password-in-couchbase
-spring.couchbase.bucket.name=dl
-spring.couchbase.bucket.password=dl1234
-spring.data.couchbase.auto-index=true
-
-#DL Feeder DB: Couchbase
-couchbaseHost=dl_couchbase
-#couchbaseHost=172.30.1.74
-couchbaseUser=dmaap
-couchbasePass=dmaap1234
-couchbaseBucket=dmaap
-
-#DL Feeder DB: Elasticsearch
-elasticsearchHost=dl_es
"taskDuration": "PT1H",
"completionTimeout": "PT30M",
"consumerProperties": {
- "bootstrap.servers": "dl_dmaap:9092"
+ "bootstrap.servers": "message-router-kafka:9092"
},
"useEarliestOffset": true
}
--- /dev/null
+{\r
+ "_id": "5bb3dfae5bea3f1cb49d4f3f",\r
+ "cambria.partition": "AAI",\r
+ "event-header": {\r
+ "severity": "NORMAL",\r
+ "entity-type": "esr-thirdparty-sdnc",\r
+ "top-entity-type": "esr-thirdparty-sdnc",\r
+ "entity-link": "/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1",\r
+ "event-type": "AAI-EVENT",\r
+ "domain": "dev",\r
+ "action": "CREATE",\r
+ "sequence-number": "0",\r
+ "id": "69f2935f-c3c1-4a63-b3f1-519c3898328b",\r
+ "source-name": "ying",\r
+ "version": "v11",\r
+ "timestamp": "20180919-06:20:44:216"\r
+ },\r
+ "entity": {\r
+ "thirdparty-sdnc-id": "SDWANController1",\r
+ "resource-version": "1537338043473",\r
+ "location": "Core",\r
+ "product-name": "SD-WAN",\r
+ "esr-system-info-list": {\r
+ "esr-system-info": [\r
+ {\r
+ "esr-system-info-id": "SDWANController1-ESR-1",\r
+ "system-type": "example-system-type-val-12078",\r
+ "service-url": "https://172.19.48.77:18008",\r
+ "ssl-cacert": "example-ssl-cacert-val-20589",\r
+ "type": "WAN",\r
+ "ssl-insecure": true,\r
+ "system-status": "example-system-status-val-23435",\r
+ "version": "V3R1",\r
+ "passive": true,\r
+ "password": "Onap@12345",\r
+ "protocol": "RESTCONF",\r
+ "ip-address": "172.19.48.77",\r
+ "cloud-domain": "example-cloud-domain-val-76077",\r
+ "user-name": "northapi1@huawei.com",\r
+ "system-name": "SDWANController",\r
+ "port": "18008",\r
+ "vendor": "IP-WAN",\r
+ "resource-version": "1537338044166",\r
+ "remote-path": "example-remotepath-val-5833",\r
+ "default-tenant": "example-default-tenant-val-71148"\r
+ }\r
+ ]\r
+ }\r
+ },\r
+ "_dl_type_": "JSON",\r
+ "_dl_text_": "{\"cambria.partition\":\"AAI\",\"event-header\":{\"severity\":\"NORMAL\",\"entity-type\":\"esr-thirdparty-sdnc\",\"top-entity-type\":\"esr-thirdparty-sdnc\",\"entity-link\":\"/aai/v11/external-system/esr-thirdparty-sdnc-list/esr-thirdparty-sdnc/SDWANController1\",\"event-type\":\"AAI-EVENT\",\"domain\":\"dev\",\"action\":\"CREATE\",\"sequence-number\":\"0\",\"id\":\"69f2935f-c3c1-4a63-b3f1-519c3898328b\",\"source-name\":\"ying\",\"version\":\"v11\",\"timestamp\":\"20180919-06:20:44:216\"},\"entity\":{\"thirdparty-sdnc-id\":\"SDWANController1\",\"resource-version\":\"1537338043473\",\"location\":\"Core\",\"product-name\":\"SD-WAN\",\"esr-system-info-list\":{\"esr-system-info\":[{\"esr-system-info-id\":\"SDWANController1-ESR-1\",\"system-type\":\"example-system-type-val-12078\",\"service-url\":\"https://172.19.48.77:18008\",\"ssl-cacert\":\"example-ssl-cacert-val-20589\",\"type\":\"WAN\",\"ssl-insecure\":true,\"system-status\":\"example-system-status-val-23435\",\"version\":\"V3R1\",\"passive\":true,\"password\":\"Onap@12345\",\"protocol\":\"RESTCONF\",\"ip-address\":\"172.19.48.77\",\"cloud-domain\":\"example-cloud-domain-val-76077\",\"user-name\":\"northapi1@huawei.com\",\"system-name\":\"SDWANController\",\"port\":\"18008\",\"vendor\":\"IP-WAN\",\"resource-version\":\"1537338044166\",\"remote-path\":\"example-remotepath-val-5833\",\"default-tenant\":\"example-default-tenant-val-71148\"}]}}}"\r
+}
\ No newline at end of file
"taskDuration": "PT1H",
"completionTimeout": "PT30M",
"consumerProperties": {
- "bootstrap.servers": "dl_dmaap:9092"
+ "bootstrap.servers": "message-router-kafka:9092"
},
"useEarliestOffset": true
}
"taskDuration": "PT1H",
"completionTimeout": "PT30M",
"consumerProperties": {
- "bootstrap.servers": "dl_dmaap:9092"
+ "bootstrap.servers": "message-router-kafka:9092"
},
"useEarliestOffset": true
}
"taskDuration": "PT1H",
"completionTimeout": "PT30M",
"consumerProperties": {
- "bootstrap.servers": "dl_dmaap:9092"
+ "bootstrap.servers": "message-router-kafka:9092"
},
"useEarliestOffset": true
}
--- /dev/null
+{\r
+ "_id": {\r
+ "$oid": "5bb3d3aa5bea3f41300957c6"\r
+ },\r
+ "sendEpsShort": {\r
+ "summary": "0.0 send eps (10 mins)",\r
+ "raw": 0\r
+ },\r
+ "recvEpsInstant": {\r
+ "summary": "0.03333333333333333 recv eps (1 min)",\r
+ "raw": 0.03333333333333333\r
+ },\r
+ "fanOut": {\r
+ "summary": "0.7051873815491838 sends per recv",\r
+ "raw": 0.7051873815491838\r
+ },\r
+ "sendEpsLong": {\r
+ "summary": "0.0 send eps (1 hr)",\r
+ "raw": 0\r
+ },\r
+ "kafkaConsumerTimeouts": {\r
+ "summary": "164 Kafka Consumers Timedout",\r
+ "raw": 164\r
+ },\r
+ "recvEpsLong": {\r
+ "summary": "0.03333333333333333 recv eps (1 hr)",\r
+ "raw": 0.03333333333333333\r
+ },\r
+ "sendEpsInstant": {\r
+ "summary": "0.0 send eps (1 min)",\r
+ "raw": 0\r
+ },\r
+ "recvEpsShort": {\r
+ "summary": "0.03333333333333333 recv eps (10 mins)",\r
+ "raw": 0.03333333333333333\r
+ },\r
+ "kafkaConsumerClaims": {\r
+ "summary": "1 Kafka Consumers Claimed",\r
+ "raw": 1\r
+ },\r
+ "version": {\r
+ "summary": "Version 1.1.3",\r
+ "raw": 0\r
+ },\r
+ "upTime": {\r
+ "summary": "604800 seconds since start",\r
+ "raw": 604800\r
+ },\r
+ "sendTotalEvents": {\r
+ "summary": "46139 Total events sent since start",\r
+ "raw": 46139\r
+ },\r
+ "hostname": "3d5704fccbc5",\r
+ "kafkaConsumerCacheMiss": {\r
+ "summary": "179 Kafka Consumer Cache Misses",\r
+ "raw": 179\r
+ },\r
+ "metricsSendTime": "1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC",\r
+ "kafkaConsumerCacheHit": {\r
+ "summary": "317143 Kafka Consumer Cache Hits",\r
+ "raw": 317143\r
+ },\r
+ "now": 1537639709380,\r
+ "transactionEnabled": false,\r
+ "startTime": {\r
+ "summary": "1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC",\r
+ "raw": 1537034908\r
+ },\r
+ "recvTotalEvents": {\r
+ "summary": "65428 Total events received since start",\r
+ "raw": 65428\r
+ },\r
+ "_dl_type_": "JSON",\r
+ "_dl_text_": "{\"sendEpsShort\":{\"summary\":\"0.0 send eps (10 mins)\",\"raw\":0},\"recvEpsInstant\":{\"summary\":\"0.03333333333333333 recv eps (1 min)\",\"raw\":0.03333333333333333},\"fanOut\":{\"summary\":\"0.7051873815491838 sends per recv\",\"raw\":0.7051873815491838},\"sendEpsLong\":{\"summary\":\"0.0 send eps (1 hr)\",\"raw\":0},\"kafkaConsumerTimeouts\":{\"summary\":\"164 Kafka Consumers Timedout\",\"raw\":164},\"recvEpsLong\":{\"summary\":\"0.03333333333333333 recv eps (1 hr)\",\"raw\":0.03333333333333333},\"sendEpsInstant\":{\"summary\":\"0.0 send eps (1 min)\",\"raw\":0},\"recvEpsShort\":{\"summary\":\"0.03333333333333333 recv eps (10 mins)\",\"raw\":0.03333333333333333},\"kafkaConsumerClaims\":{\"summary\":\"1 Kafka Consumers Claimed\",\"raw\":1},\"version\":{\"summary\":\"Version 1.1.3\",\"raw\":0},\"upTime\":{\"summary\":\"604800 seconds since start\",\"raw\":604800},\"sendTotalEvents\":{\"summary\":\"46139 Total events sent since start\",\"raw\":46139},\"hostname\":\"3d5704fccbc5\",\"kafkaConsumerCacheMiss\":{\"summary\":\"179 Kafka Consumer Cache Misses\",\"raw\":179},\"metricsSendTime\":\"1537639709 Metrics Send Time (epoch); 2018-09-22T18:08:29UTC\",\"kafkaConsumerCacheHit\":{\"summary\":\"317143 Kafka Consumer Cache Hits\",\"raw\":317143},\"now\":1537639709380,\"transactionEnabled\":false,\"startTime\":{\"summary\":\"1537034908 Start Time (epoch); 2018-09-15T18:08:28UTC\",\"raw\":1537034908},\"recvTotalEvents\":{\"summary\":\"65428 Total events received since start\",\"raw\":65428}}"\r
+}\r
--- /dev/null
+{\r
+ "_id": "5bb3d3ad5bea3f41300959ba",\r
+ "closedLoopEventClient": "DCAE.HolmesInstance",\r
+ "policyVersion": "1.0.0.5",\r
+ "policyName": "CCVPN",\r
+ "policyScope": "service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8",\r
+ "target_type": "VM",\r
+ "AAI": {\r
+ "serviceType": "TestService",\r
+ "service-instance_service-instance-id": "200",\r
+ "globalSubscriberId": "Customer1",\r
+ "vserver_vserver-name": "TBD",\r
+ "network-information_network-id": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200"\r
+ },\r
+ "closedLoopAlarmStart": "1532769303924000",\r
+ "closedLoopEventStatus": "ONSET",\r
+ "version": "1.0.2",\r
+ "closedLoopControlName": "ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b",\r
+ "target": "vserver.vserver-name",\r
+ "closedLoopAlarmEnd": "1532769303924000",\r
+ "requestID": "6f455b14-efd9-450a-bf78-e47d55b6da87",\r
+ "from": "DCAE",\r
+ "_dl_type_": "JSON",\r
+ "_dl_text_": "{\"closedLoopEventClient\":\"DCAE.HolmesInstance\",\"policyVersion\":\"1.0.0.5\",\"policyName\":\"CCVPN\",\"policyScope\":\"service=SOTNService,type=SampleType,closedLoopControlName=CL-CCVPN-d925ed73-8231-4d02-9545-db4e101f88f8\",\"target_type\":\"VM\",\"AAI\":{\"serviceType\":\"TestService\",\"service-instance.service-instance-id\":\"200\",\"globalSubscriberId\":\"Customer1\",\"vserver.vserver-name\":\"TBD\",\"network-information.network-id\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F200\"},\"closedLoopAlarmStart\":1532769303924000,\"closedLoopEventStatus\":\"ONSET\",\"version\":\"1.0.2\",\"closedLoopControlName\":\"ControlLoop-CCVPN-2179b738-fd36-4843-a71a-a8c24c70c66b\",\"target\":\"vserver.vserver-name\",\"closedLoopAlarmEnd\":1532769303924000,\"requestID\":\"6f455b14-efd9-450a-bf78-e47d55b6da87\",\"from\":\"DCAE\"}"\r
+}
\ No newline at end of file
--- /dev/null
+{\r
+ "_id": "5bb3d3b45bea3f41300960f8",\r
+ "event": {\r
+ "commonEventHeader": {\r
+ "sourceId": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27",\r
+ "startEpochMicrosec": "1537438335829000",\r
+ "eventId": "2ef8b41b-b081-477b-9d0b-1aaaa3b69857",\r
+ "domain": "fault",\r
+ "lastEpochMicrosec": 1537438335829000,\r
+ "eventName": "Fault_Route_Status",\r
+ "sourceName": "/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27",\r
+ "priority": "High",\r
+ "version": 3,\r
+ "reportingEntityName": "Domain_Contorller"\r
+ },\r
+ "faultFields": {\r
+ "eventSeverity": "CRITICAL",\r
+ "alarmCondition": "Route_Status",\r
+ "faultFieldsVersion": 2,\r
+ "specificProblem": "Fault_SOTN_Service_Status",\r
+ "alarmAdditionalInformation": [\r
+ {\r
+ "name": "networkId",\r
+ "value": "providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100"\r
+ },\r
+ {\r
+ "name": "node",\r
+ "value": "11.11.11.11"\r
+ },\r
+ {\r
+ "name": "tp-id",\r
+ "value": "27"\r
+ },\r
+ {\r
+ "name": "oper-status",\r
+ "value": "down"\r
+ },\r
+ {\r
+ "name": "network-ref",\r
+ "value": "providerId/5555/clientId/6666/topologyId/33"\r
+ },\r
+ {\r
+ "name": "node-ref",\r
+ "value": "0.51.0.103"\r
+ },\r
+ {\r
+ "name": "tp-ref",\r
+ "value": "4"\r
+ }\r
+ ],\r
+ "eventSourceType": "other",\r
+ "vfStatus": "Active"\r
+ }\r
+ },\r
+ "_dl_type_": "JSON",\r
+ "_dl_text_": "{\"event\":{\"commonEventHeader\":{\"sourceId\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"startEpochMicrosec\":1537438335829000,\"eventId\":\"2ef8b41b-b081-477b-9d0b-1aaaa3b69857\",\"domain\":\"fault\",\"lastEpochMicrosec\":1537438335829000,\"eventName\":\"Fault_Route_Status\",\"sourceName\":\"/networks/network=providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100/node=11.11.11.11/termination-point=27\",\"priority\":\"High\",\"version\":3,\"reportingEntityName\":\"Domain_Contorller\"},\"faultFields\":{\"eventSeverity\":\"CRITICAL\",\"alarmCondition\":\"Route_Status\",\"faultFieldsVersion\":2,\"specificProblem\":\"Fault_SOTN_Service_Status\",\"alarmAdditionalInformation\":[{\"name\":\"networkId\",\"value\":\"providerId%2F5555%2FclientId%2F6666%2FtopologyId%2F100\"},{\"name\":\"node\",\"value\":\"11.11.11.11\"},{\"name\":\"tp-id\",\"value\":\"27\"},{\"name\":\"oper-status\",\"value\":\"down\"},{\"name\":\"network-ref\",\"value\":\"providerId/5555/clientId/6666/topologyId/33\"},{\"name\":\"node-ref\",\"value\":\"0.51.0.103\"},{\"name\":\"tp-ref\",\"value\":\"4\"}],\"eventSourceType\":\"other\",\"vfStatus\":\"Active\"}}}"\r
+}
\ No newline at end of file
@Test
public void readConfig() {
- assertNotNull(config.getCouchbaseHost());
- assertNotNull(config.getCouchbaseUser());
- assertNotNull(config.getCouchbasePass());
- assertNotNull(config.getCouchbaseBucket());
assertNotNull(config.getDmaapZookeeperHostPort());
assertNotNull(config.getDmaapKafkaHostPort());
--- /dev/null
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import org.junit.Test;
+
+/**
+ * Test Db
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+public class DbTest {
+
+ @Test
+ public void testIs() {
+ Db couchbase=new Db("Couchbase");
+ Db mongoDB=new Db("MongoDB");
+ Db mongoDB2=new Db("MongoDB");
+ assertNotEquals(couchbase.hashCode(), mongoDB.hashCode());
+ assertNotEquals(couchbase, mongoDB);
+ assertEquals(mongoDB, mongoDB2);
+ }
+}
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.util.HashSet;
+
import org.json.JSONObject;
-import org.junit.Test;
+import org.junit.Test;
+import org.onap.datalake.feeder.enumeration.DataFormat;
/**
* Test Topic
assertEquals(value, "hello");
}
+ @Test
+ public void getMessageIdFromMultipleAttributes() {
+ String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}";
+
+ JSONObject json = new JSONObject(text);
+
+ Topic topic = new Topic("test getMessageId");
+ topic.setMessageIdPath("/data/data2/value,/data/data3");
+
+ String value = topic.getMessageId(json);
+
+ assertEquals(value, "hello^world");
+ }
+
@Test
public void testIs() {
- Topic defaultTopic=new Topic("default");
+ Topic defaultTopic=new Topic("_DL_DEFAULT_");
Topic testTopic = new Topic("test");
testTopic.setDefaultTopic(defaultTopic);
+
+ assertTrue(defaultTopic.isDefault());
+ assertFalse(testTopic.isDefault());
+
+ assertTrue(testTopic.equals(new Topic("test")));
+ assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
+
+ defaultTopic.setDbs(new HashSet<>());
+ defaultTopic.getDbs().add(new Db("Elasticsearch"));
+ assertTrue(testTopic.supportElasticsearch());
+ assertFalse(testTopic.supportCouchbase());
+ assertFalse(testTopic.supportDruid());
+ assertFalse(testTopic.supportMongoDB());
+
+ defaultTopic.getDbs().remove(new Db("Elasticsearch"));
+ assertFalse(testTopic.supportElasticsearch());
- defaultTopic.setSupportElasticsearch(true);
- boolean b = testTopic.isSupportElasticsearch();
- assertTrue(b);
+ defaultTopic.setCorrelateClearedMessage(true);
+ defaultTopic.setDataFormat("XML");
+ defaultTopic.setEnabled(true);
+ defaultTopic.setSaveRaw(true);
+ assertTrue(testTopic.isCorrelateClearedMessage());
+ assertTrue(testTopic.isEnabled());
+ assertTrue(testTopic.isSaveRaw());
- defaultTopic.setSupportElasticsearch(false);
- b = testTopic.isSupportElasticsearch();
- assertFalse(b);
+ assertEquals(defaultTopic.getDataFormat(), DataFormat.XML);
}
}
server.port = 1680
-
-#For Beijing lab
-#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80
-#dmaapKafkaHostPort=kafka.mr01.onap.vip:80
-#spring.couchbase.bootstrap-hosts=172.30.1.74
-#couchbaseHost=172.30.1.74
-
-
#DMaaP
#dmaapZookeeperHostPort=127.0.0.1:2181
#dmaapKafkaHostPort=127.0.0.1:9092
logging.level.com.att.nsa.apiClient.http=ERROR
logging.level.org.onap.datalake=DEBUG
-
-#DL Feeder DB: Couchbase
-couchbaseHost=dl_couchbase
-#couchbaseHost=172.30.1.74
-couchbaseUser=dmaap
-couchbasePass=dmaap1234
-couchbaseBucket=dmaap
-
-#DL Feeder DB: Elasticsearch
-elasticsearchHost=dl_es
+
<springcouchbase.version>3.1.2.RELEASE</springcouchbase.version>
<jackson.version>2.9.6</jackson.version>
<kafka.version>2.0.0</kafka.version>
- <elasticsearchjava.version>6.6.0</elasticsearchjava.version>
+ <elasticsearchjava.version>6.7.0</elasticsearchjava.version>
</properties>
<dependencyManagement>
<dependencies>
+ <dependency>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ <version>2.4.1</version>
+ </dependency>
+
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${springboot.version}</version>
</dependency>
<!-- end::actuator[] -->
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jpa</artifactId>
+ <version>${springboot.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-couchbase</artifactId>