From 2920426bd0f8369a178895138e97b0b19372c413 Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Mon, 29 Apr 2019 17:05:39 -0700 Subject: [PATCH] Use TopicConfig in backend Issue-ID: DCAEGEN2-1200 Change-Id: Ia18993524c272c42ba48485a636f04f94af0cd0a Signed-off-by: Guobiao Mo --- .../feeder/src/assembly/scripts/init_db.sql | 7 +- .../feeder/config/ApplicationConfiguration.java | 1 + .../feeder/controller/TopicController.java | 25 +++-- .../org/onap/datalake/feeder/domain/Topic.java | 115 ++++----------------- .../org/onap/datalake/feeder/dto/TopicConfig.java | 96 ++++++++++++++--- .../datalake/feeder/service/CouchbaseService.java | 5 +- .../onap/datalake/feeder/service/DmaapService.java | 34 +++--- .../feeder/service/ElasticsearchService.java | 10 +- .../datalake/feeder/service/MongodbService.java | 7 +- .../onap/datalake/feeder/service/PullThread.java | 1 + .../onap/datalake/feeder/service/StoreService.java | 11 +- .../onap/datalake/feeder/service/TopicService.java | 17 +-- .../src/main/resources/application.properties | 2 + .../feeder/controller/TopicControllerTest.java | 4 +- .../org/onap/datalake/feeder/domain/TopicTest.java | 39 +------ .../onap/datalake/feeder/dto/TopicConfigTest.java | 100 ++++++++++++++++++ .../feeder/service/CouchbaseServiceTest.java | 4 +- .../feeder/service/ElasticsearchServiceTest.java | 2 +- .../feeder/service/MongodbServiceTest.java | 2 +- 19 files changed, 269 insertions(+), 213 deletions(-) create mode 100644 components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql index 44f4ef17..e201242d 100644 --- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql +++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql @@ -11,10 +11,7 @@ CREATE TABLE `topic` ( `save_raw` bit(1) DEFAULT NULL, `ttl` int(11) DEFAULT NULL, `data_format` varchar(255) DEFAULT NULL, - `default_topic` varchar(255) DEFAULT NULL, - PRIMARY KEY (`name`), - KEY `FK_default_topic` (`default_topic`), - CONSTRAINT `FK_default_topic` FOREIGN KEY (`default_topic`) REFERENCES `topic` (`name`) + PRIMARY KEY (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; @@ -52,7 +49,7 @@ insert into db (`name`,`host`) values ('Druid','dl_druid'); -- in production, default enabled should be off insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON'); insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0); -insert into `topic`(`name`,correlate_cleared_message,`enabled`,default_topic, message_id_path) values ('unauthenticated.SEC_FAULT_OUTPUT',1,0,'_DL_DEFAULT_', '/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem'); +insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path) values ('unauthenticated.SEC_FAULT_OUTPUT',1,0,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem'); insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_'); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index a3add0ed..d59c0fc1 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -45,6 +45,7 @@ public class ApplicationConfiguration { private String dmaapKafkaHostPort; private String dmaapKafkaGroup; private long dmaapKafkaTimeout; + private String[] dmaapKafkaExclude; private int dmaapCheckNewTopicIntervalInSec; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java index f08a994d..d3a1fce3 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -55,10 +55,10 @@ import io.swagger.annotations.ApiOperation; /** * This controller manages topic settings. * - * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's - * enabled=null, _DL_DEFAULT_.enabled is used for that topic. All the settings - * are saved in database. topic "_DL_DEFAULT_" is populated at setup by a DB - * script. + * Topic "_DL_DEFAULT_" acts as the default. + * If a topic is not present in database, "_DL_DEFAULT_" is used for it. + * If a topic is present in database, itself should be complete, and no setting from "_DL_DEFAULT_" is used. + * Topic "_DL_DEFAULT_" is populated at setup by a DB script. * * @author Guobiao Mo * @contributor Kate Hsuan @ QCT @@ -88,7 +88,7 @@ public class TopicController { @GetMapping("") @ResponseBody - @ApiOperation(value="List all topics in database") + @ApiOperation(value="List all topic names in database") public List list() { Iterable ret = topicRepository.findAll(); List retString = new ArrayList<>(); @@ -114,13 +114,11 @@ public class TopicController { sendError(response, 400, "Topic already exists "+topicConfig.getName()); return null; } else { - PostReturnBody retBody = new PostReturnBody<>(); Topic wTopic = topicService.fillTopicConfiguration(topicConfig); if(wTopic.getTtl() == 0) wTopic.setTtl(3650); - topicRepository.save(wTopic); - mkPostReturnBody(retBody, 200, wTopic); - return retBody; + topicRepository.save(wTopic); + return mkPostReturnBody(200, wTopic); } } @@ -159,18 +157,19 @@ public class TopicController { sendError(response, 404, "Topic not found "+topicConfig.getName()); return null; } else { - PostReturnBody retBody = new PostReturnBody<>(); topicService.fillTopicConfiguration(topicConfig, oldTopic); topicRepository.save(oldTopic); - mkPostReturnBody(retBody, 200, oldTopic); - return retBody; + return mkPostReturnBody(200, oldTopic); } } - private void mkPostReturnBody(PostReturnBody retBody, int statusCode, Topic topic) + private PostReturnBody mkPostReturnBody(int statusCode, Topic topic) { + PostReturnBody retBody = new PostReturnBody<>(); retBody.setStatusCode(statusCode); retBody.setReturnBody(topic.getTopicConfig()); + + return retBody; } private void sendError(HttpServletResponse response, int sc, String msg) throws IOException { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index 06c6b8cc..c618f57f 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -32,10 +32,7 @@ import javax.persistence.JoinTable; import javax.persistence.ManyToMany; import javax.persistence.Table; -import org.apache.commons.lang3.StringUtils; -import org.json.JSONObject; import org.onap.datalake.feeder.dto.TopicConfig; -import org.onap.datalake.feeder.enumeration.DataFormat; import com.fasterxml.jackson.annotation.JsonBackReference; @@ -54,11 +51,10 @@ import lombok.Setter; @Table(name = "topic") public class Topic { @Id - @Column(name="`name`") + @Column(name = "`name`") private String name;//topic name - - //for protected Kafka topics + //for protected Kafka topics @Column(name = "`login`") private String login; @@ -69,16 +65,13 @@ public class Topic { @JsonBackReference //@JsonManagedReference @ManyToMany(fetch = FetchType.EAGER) - @JoinTable( name = "map_db_topic", - joinColumns = { @JoinColumn(name="topic_name") }, - inverseJoinColumns = { @JoinColumn(name="db_name") } - ) + @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_name") }, inverseJoinColumns = { @JoinColumn(name = "db_name") }) protected Set dbs; /** * indicate if we should monitor this topic */ - @Column(name="`enabled`") + @Column(name = "`enabled`") private Boolean enabled; /** @@ -88,10 +81,10 @@ public class Topic { private Boolean saveRaw; /** - * need to explicitly tell feeder the data format of the message. - * support JSON, XML, YAML, TEXT + * need to explicitly tell feeder the data format of the message. support JSON, + * XML, YAML, TEXT */ - @Column(name="`data_format`") + @Column(name = "`data_format`") private String dataFormat; /** @@ -114,22 +107,6 @@ public class Topic { this.name = name; } - public Topic clone() { //TODO will use TopicConfig - Topic ret = new Topic(); - ret.setCorrelateClearedMessage(correlateClearedMessage); - ret.setDataFormat(dataFormat); - ret.setDbs(dbs); - ret.setEnabled(enabled); - ret.setLogin(login); - ret.setMessageIdPath(messageIdPath); - ret.setName(name); - ret.setPass(pass); - ret.setSaveRaw(saveRaw); - ret.setTtl(ttl); - - return ret; - } - public boolean isDefault() { return "_DL_DEFAULT_".equals(name); } @@ -145,19 +122,11 @@ public class Topic { public int getTtl() { if (ttl != null) { return ttl; - } else { + } else { return 3650;//default to 10 years for safe } } - public DataFormat getDataFormat() { - if (dataFormat != null) { - return DataFormat.fromString(dataFormat); - } else { - return null; - } - } - private boolean is(Boolean b) { return is(b, false); } @@ -165,7 +134,7 @@ public class Topic { private boolean is(Boolean b, boolean defaultValue) { if (b != null) { return b; - } else { + } else { return defaultValue; } } @@ -174,71 +143,25 @@ public class Topic { return is(saveRaw); } - public boolean supportElasticsearch() { - return containDb("Elasticsearch");//TODO string hard codes - } - - public boolean supportCouchbase() { - return containDb("Couchbase"); - } - - public boolean supportDruid() { - return containDb("Druid"); - } - - public boolean supportMongoDB() { - return containDb("MongoDB"); - } - - private boolean containDb(String dbName) { - Db db = new Db(dbName); - - if (dbs != null && dbs.contains(db)) { - return true; - } else { - return false; - } - } - - //extract DB id from JSON attributes, support multiple attributes - public String getMessageId(JSONObject json) { - String id = null; - - if (StringUtils.isNotBlank(messageIdPath)) { - String[] paths = messageIdPath.split(","); - - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < paths.length; i++) { - if (i > 0) { - sb.append('^'); - } - sb.append(json.query(paths[i]).toString()); - } - id = sb.toString(); - } - - return id; - } - public TopicConfig getTopicConfig() { TopicConfig tConfig = new TopicConfig(); - + tConfig.setName(getName()); - tConfig.setEnable(getEnabled()); - if(getDataFormat() != null) - tConfig.setDataFormat(getDataFormat().toString()); - tConfig.setSaveRaw(getSaveRaw()); - tConfig.setCorrelatedClearredMessage((getCorrelateClearedMessage() == null) ? getCorrelateClearedMessage() : false); + tConfig.setEnabled(isEnabled()); + tConfig.setDataFormat(dataFormat); + tConfig.setSaveRaw(isSaveRaw()); + tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage()); tConfig.setMessageIdPath(getMessageIdPath()); tConfig.setTtl(getTtl()); Set topicDb = getDbs(); List dbList = new ArrayList<>(); - for(Db item: topicDb) - { - dbList.add(item.getName()); + if (topicDb != null) { + for (Db item : topicDb) { + dbList.add(item.getName()); + } } tConfig.setSinkdbs(dbList); - + return tConfig; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index 76b41cb5..15ffc8a3 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -22,14 +22,12 @@ package org.onap.datalake.feeder.dto; import lombok.Getter; import lombok.Setter; -import org.onap.datalake.feeder.domain.Topic; -import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.repository.DbRepository; -import java.util.HashSet; import java.util.List; -import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.json.JSONObject; +import org.onap.datalake.feeder.enumeration.DataFormat; /** * JSON request body for Topic manipulation. @@ -43,17 +41,85 @@ import java.util.Set; public class TopicConfig { - private String name; - private String login; - private String password; - private List sinkdbs; - private boolean enable; - private boolean saveRaw; - private String dataFormat; - private int ttl; - private boolean correlatedClearredMessage; - private String messageIdPath; + private String name; + private String login; + private String password; + private List sinkdbs; + private boolean enabled; + private boolean saveRaw; + private String dataFormat; + private int ttl; + private boolean correlateClearedMessage; + private String messageIdPath; + public DataFormat getDataFormat2() { + if (dataFormat != null) { + return DataFormat.fromString(dataFormat); + } else { + return null; + } + } + + public boolean supportElasticsearch() { + return containDb("Elasticsearch");//TODO string hard codes + } + + public boolean supportCouchbase() { + return containDb("Couchbase"); + } + + public boolean supportDruid() { + return containDb("Druid"); + } + + public boolean supportMongoDB() { + return containDb("MongoDB"); + } + + private boolean containDb(String dbName) { + return (sinkdbs != null && sinkdbs.contains(dbName)); + } + + //extract DB id from JSON attributes, support multiple attributes + public String getMessageId(JSONObject json) { + String id = null; + + if (StringUtils.isNotBlank(messageIdPath)) { + String[] paths = messageIdPath.split(","); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < paths.length; i++) { + if (i > 0) { + sb.append('^'); + } + sb.append(json.query(paths[i]).toString()); + } + id = sb.toString(); + } + + return id; + } + + @Override + public String toString() { + return name; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) + return false; + + if (this.getClass() != obj.getClass()) + return false; + + return name.equals(((TopicConfig) obj).getName()); + } + + @Override + public int hashCode() { + return name.hashCode(); + } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java index 1e5fb78b..12d03ee6 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -30,6 +30,7 @@ import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.dto.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +91,7 @@ public class CouchbaseService { bucket.close(); } - public void saveJsons(Topic topic, List jsons) { + public void saveJsons(TopicConfig topic, List jsons) { List documents= new ArrayList<>(jsons.size()); for(JSONObject json : jsons) { //convert to Couchbase JsonObject from org.json JSONObject @@ -109,7 +110,7 @@ public class CouchbaseService { log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); } - public String getId(Topic topic, JSONObject json) { + public String getId(TopicConfig topic, JSONObject json) { //if this topic requires extract id from JSON String id = topic.getMessageId(json); if(id != null) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java index 270db932..de8c9e89 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java @@ -25,13 +25,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import javax.annotation.PostConstruct; - import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.onap.datalake.feeder.config.ApplicationConfiguration; -import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.dto.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -54,12 +52,6 @@ public class DmaapService { @Autowired private TopicService topicService; - - @PostConstruct - private void init() { - - } - //get all topic names from Zookeeper public List getTopics() { try { @@ -67,28 +59,32 @@ public class DmaapService { @Override public void process(WatchedEvent event) { // TODO monitor new topics - + } - }; + }; ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher); - List topics = zk.getChildren("/brokers/topics", false); - return topics; + List topics = zk.getChildren("/brokers/topics", false); + String[] excludes = config.getDmaapKafkaExclude(); + for (String exclude : excludes) { + topics.remove(exclude); + } + return topics; } catch (Exception e) { log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e); - return null; + return Collections.emptyList(); } - } + } - public List getActiveTopics() throws IOException { + public List getActiveTopics() throws IOException { List allTopics = getTopics(); - if(allTopics == null) { + if (allTopics == null) { return Collections.emptyList(); } List ret = new ArrayList<>(); for (String topicStr : allTopics) { - Topic topic = topicService.getEffectiveTopic(topicStr, true); - if (topic.isEnabled()) { + TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true); + if (topicConfig.isEnabled()) { ret.add(topicStr); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java index 30aa7332..4090e7eb 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -47,7 +47,7 @@ import org.elasticsearch.rest.RestStatus; import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.dto.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,6 +112,7 @@ public class ElasticsearchService { boolean exists = client.indices().exists(request, RequestOptions.DEFAULT); if(!exists){ + //TODO submit mapping template CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged()); @@ -119,7 +120,7 @@ public class ElasticsearchService { } //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME - public void saveJsons(Topic topic, List jsons) { + public void saveJsons(TopicConfig topic, List jsons) { BulkRequest request = new BulkRequest(); for (JSONObject json : jsons) { @@ -134,6 +135,9 @@ public class ElasticsearchService { request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON)); } + + log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size()); + if(config.isAsync()) { client.bulkAsync(request, RequestOptions.DEFAULT, listener); }else { @@ -155,7 +159,7 @@ public class ElasticsearchService { * The search API can only query all data or based on the fields in the source. * So use the get API, three parameters: index, type, document id */ - private boolean correlateClearedMessage(Topic topic, JSONObject json) { + private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) { boolean found = false; String eName = null; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java index fb3f806c..02c80a45 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java @@ -34,7 +34,7 @@ import org.bson.Document; import org.json.JSONObject; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Db; -import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.dto.TopicConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +48,6 @@ import com.mongodb.MongoCredential; import com.mongodb.ServerAddress; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; -import com.mongodb.DB; /** * Service for using MongoDB @@ -131,7 +130,7 @@ public class MongodbService { mongoClient.close(); } - public void saveJsons(Topic topic, List jsons) { + public void saveJsons(TopicConfig topic, List jsons) { if(dbReady == false) return; List documents = new ArrayList<>(jsons.size()); @@ -150,7 +149,7 @@ public class MongodbService { MongoCollection collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k)); collection.insertMany(documents); - log.debug("saved text to topic = {}, topic total count = {} ", topic, collection.countDocuments()); + log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size()); } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java index ce671a90..b3a6d29a 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java @@ -82,6 +82,7 @@ public class PullThread implements Runnable { private void init() { async = config.isAsync(); Properties consumerConfig = getConsumerConfig(); + log.info("Kafka ConsumerConfig: {}", consumerConfig); consumer = new KafkaConsumer<>(consumerConfig); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index a4f79107..449dacfc 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -36,6 +36,7 @@ import org.json.JSONObject; import org.json.XML; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.enumeration.DataFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,7 +77,7 @@ public class StoreService { @Autowired private ElasticsearchService elasticsearchService; - private Map topicMap = new HashMap<>(); + private Map topicMap = new HashMap<>(); private ObjectMapper yamlReader; @@ -94,7 +95,7 @@ public class StoreService { return; } - Topic topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically + TopicConfig topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically return topicService.getEffectiveTopic(topicStr); }); @@ -111,7 +112,7 @@ public class StoreService { saveJsons(topic, docs); } - private JSONObject messageToJson(Topic topic, Pair pair) throws JSONException, JsonParseException, JsonMappingException, IOException { + private JSONObject messageToJson(TopicConfig topic, Pair pair) throws JSONException, JsonParseException, JsonMappingException, IOException { long timestamp = pair.getLeft(); String text = pair.getRight(); @@ -126,7 +127,7 @@ public class StoreService { JSONObject json = null; - DataFormat dataFormat = topic.getDataFormat(); + DataFormat dataFormat = topic.getDataFormat2(); switch (dataFormat) { case JSON: @@ -160,7 +161,7 @@ public class StoreService { return json; } - private void saveJsons(Topic topic, List jsons) { + private void saveJsons(TopicConfig topic, List jsons) { if (topic.supportMongoDB()) { mongodbService.saveJsons(topic, jsons); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index 7ae3ff71..f0b000bc 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -60,7 +60,7 @@ public class TopicService { @Autowired private DbRepository dbRepository; - public Topic getEffectiveTopic(String topicStr) { + public TopicConfig getEffectiveTopic(String topicStr) { try { return getEffectiveTopic(topicStr, false); } catch (IOException e) { @@ -69,17 +69,18 @@ public class TopicService { return null; } - public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException { + public TopicConfig getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException { Topic topic = getTopic(topicStr); if (topic == null) { - topic = getDefaultTopic().clone(); - topic.setName(topicStr); + topic = getDefaultTopic(); } + TopicConfig topicConfig = topic.getTopicConfig(); + topicConfig.setName(topicStr);//need to change name if it comes from DefaultTopic - if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) { + if(ensureTableExist && topicConfig.isEnabled() && topicConfig.supportElasticsearch()) { elasticsearchService.ensureTableExist(topicStr); } - return topic; + return topicConfig; } public Topic getTopic(String topicStr) { @@ -116,10 +117,10 @@ public class TopicService { topic.setName(tConfig.getName()); topic.setLogin(tConfig.getLogin()); topic.setPass(tConfig.getPassword()); - topic.setEnabled(tConfig.isEnable()); + topic.setEnabled(tConfig.isEnabled()); topic.setSaveRaw(tConfig.isSaveRaw()); topic.setTtl(tConfig.getTtl()); - topic.setCorrelateClearedMessage(tConfig.isCorrelatedClearredMessage()); + topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage()); topic.setDataFormat(tConfig.getDataFormat()); topic.setMessageIdPath(tConfig.getMessageIdPath()); diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties index dfe48a29..e1b83999 100644 --- a/components/datalake-handler/feeder/src/main/resources/application.properties +++ b/components/datalake-handler/feeder/src/main/resources/application.properties @@ -27,6 +27,8 @@ dmaapZookeeperHostPort=message-router-zookeeper:2181 dmaapKafkaHostPort=message-router-kafka:9092 dmaapKafkaGroup=dlgroup10 dmaapKafkaTimeout=60 +dmaapKafkaExclude[0]=__consumer_offsets +dmaapKafkaExclude[1]=msgrtr.apinode.metrics.dmaap #check for new topics dmaapCheckNewTopicIntervalInSec=3000 diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java index 7c2bf916..e96d940c 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java @@ -136,12 +136,12 @@ public class TopicControllerTest { when(topicRepository.findById("a")).thenReturn(Optional.of(a)); TopicConfig ac = new TopicConfig(); ac.setName("a"); - ac.setEnable(true); + ac.setEnabled(true); PostReturnBody postConfig1 = topicController.updateTopic("a", ac, mockBindingResult, httpServletResponse); assertEquals(200, postConfig1.getStatusCode()); TopicConfig ret = postConfig1.getReturnBody(); assertEquals("a", ret.getName()); - assertEquals(true, ret.isEnable()); + assertEquals(true, ret.isEnabled()); when(mockBindingResult.hasErrors()).thenReturn(true); PostReturnBody postConfig2 = topicController.updateTopic("a", ac, mockBindingResult, httpServletResponse); assertEquals(null, postConfig2); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java index b583473a..74f0884f 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java @@ -19,7 +19,6 @@ */ package org.onap.datalake.feeder.domain; -import org.json.JSONObject; import org.junit.Test; import org.onap.datalake.feeder.enumeration.DataFormat; @@ -27,7 +26,6 @@ import java.util.HashSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; /** @@ -38,35 +36,9 @@ import static org.junit.Assert.assertTrue; public class TopicTest { - @Test - public void getMessageId() { - String text = "{ data: { data2 : { value : 'hello'}}}"; - - JSONObject json = new JSONObject(text); - - Topic topic = new Topic("test getMessageId"); - topic.setMessageIdPath("/data/data2/value"); - - String value = topic.getMessageId(json); - - assertEquals(value, "hello"); - } - @Test public void getMessageIdFromMultipleAttributes() { - String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}"; - - JSONObject json = new JSONObject(text); - - Topic topic = new Topic("test getMessageId"); - topic.setMessageIdPath("/data/data2/value,/data/data3"); - - String value = topic.getMessageId(json); - assertEquals(value, "hello^world"); - - topic.setMessageIdPath(""); - assertNull(topic.getMessageId(json)); - + Topic topic = new Topic("test getMessageId"); Topic defaultTopic = new Topic("_DL_DEFAULT_"); Topic testTopic = new Topic("test"); @@ -99,13 +71,6 @@ public class TopicTest { defaultTopic.setDbs(new HashSet<>()); defaultTopic.getDbs().add(new Db("Elasticsearch")); - assertTrue(defaultTopic.supportElasticsearch()); - assertFalse(testTopic.supportCouchbase()); - assertFalse(testTopic.supportDruid()); - assertFalse(testTopic.supportMongoDB()); - - defaultTopic.getDbs().remove(new Db("Elasticsearch")); - assertFalse(testTopic.supportElasticsearch()); assertEquals(defaultTopic.getDataFormat(), null); defaultTopic.setCorrelateClearedMessage(true); @@ -116,7 +81,7 @@ public class TopicTest { assertTrue(defaultTopic.isEnabled()); assertTrue(defaultTopic.isSaveRaw()); - assertEquals(defaultTopic.getDataFormat(), DataFormat.XML); + assertEquals(defaultTopic.getTopicConfig().getDataFormat2(), DataFormat.XML); defaultTopic.setDataFormat(null); assertEquals(testTopic.getDataFormat(), null); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java new file mode 100644 index 00000000..c65e920e --- /dev/null +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java @@ -0,0 +1,100 @@ +/* + * ============LICENSE_START======================================================= + * ONAP : DataLake + * ================================================================================ + * Copyright 2019 China Mobile + *================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.datalake.feeder.dto; + +import org.json.JSONObject; +import org.junit.Test; +import org.onap.datalake.feeder.domain.Db; +import org.onap.datalake.feeder.domain.Topic; + +import java.util.HashSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test Topic + * + * @author Guobiao Mo + */ + +public class TopicConfigTest { + + @Test + public void getMessageId() { + String text = "{ data: { data2 : { value : 'hello'}}}"; + + JSONObject json = new JSONObject(text); + + Topic topic = new Topic("test getMessageId"); + topic.setMessageIdPath("/data/data2/value"); + + TopicConfig topicConfig = topic.getTopicConfig(); + + String value = topicConfig.getMessageId(json); + + assertEquals(value, "hello"); + } + + @Test + public void getMessageIdFromMultipleAttributes() { + String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}"; + + JSONObject json = new JSONObject(text); + + Topic topic = new Topic("test getMessageId"); + topic.setMessageIdPath("/data/data2/value,/data/data3"); + + TopicConfig topicConfig = topic.getTopicConfig(); + + String value = topicConfig.getMessageId(json); + assertEquals(value, "hello^world"); + + topic.setMessageIdPath(""); + topicConfig = topic.getTopicConfig(); + assertNull(topicConfig.getMessageId(json)); + + } + + @Test + public void testIs() { + Topic testTopic = new Topic("test"); + + assertTrue(testTopic.equals(new Topic("test"))); + assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode()); + + testTopic.setDbs(new HashSet<>()); + testTopic.getDbs().add(new Db("Elasticsearch")); + + TopicConfig testTopicConfig = testTopic.getTopicConfig(); + + assertTrue(testTopicConfig.supportElasticsearch()); + assertFalse(testTopicConfig.supportCouchbase()); + assertFalse(testTopicConfig.supportDruid()); + assertFalse(testTopicConfig.supportMongoDB()); + + testTopic.getDbs().remove(new Db("Elasticsearch")); + testTopicConfig = testTopic.getTopicConfig(); + assertFalse(testTopicConfig.supportElasticsearch()); + + } +} diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java index 9e1b2d99..0efde44c 100755 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java @@ -114,7 +114,7 @@ public class CouchbaseServiceTest { CouchbaseService couchbaseService = new CouchbaseService(); couchbaseService.bucket = bucket; couchbaseService.config = appConfig; - couchbaseService.saveJsons(topic, jsons); + couchbaseService.saveJsons(topic.getTopicConfig(), jsons); } @@ -134,7 +134,7 @@ public class CouchbaseServiceTest { CouchbaseService couchbaseService = new CouchbaseService(); couchbaseService.bucket = bucket; couchbaseService.config = appConfig; - couchbaseService.saveJsons(topic, jsons); + couchbaseService.saveJsons(topic.getTopicConfig(), jsons); } @Test diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/ElasticsearchServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/ElasticsearchServiceTest.java index de2c1674..9590b0a4 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/ElasticsearchServiceTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/ElasticsearchServiceTest.java @@ -89,7 +89,7 @@ public class ElasticsearchServiceTest { when(config.getElasticsearchType()).thenReturn("doc"); when(config.isAsync()).thenReturn(true); - elasticsearchService.saveJsons(topic, jsons); + elasticsearchService.saveJsons(topic.getTopicConfig(), jsons); } } \ No newline at end of file diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/MongodbServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/MongodbServiceTest.java index 41856760..016381be 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/MongodbServiceTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/MongodbServiceTest.java @@ -83,6 +83,6 @@ public class MongodbServiceTest { jsons.add(jsonObject); jsons.add(jsonObject2); - mongodbService.saveJsons(topic, jsons); + mongodbService.saveJsons(topic.getTopicConfig(), jsons); } } \ No newline at end of file -- 2.16.6