Use TopicConfig in backend 73/86573/2 1.0.0 4.0.0-ONAP
authorGuobiao Mo <guobiaomo@chinamobile.com>
Tue, 30 Apr 2019 00:05:39 +0000 (17:05 -0700)
committerGuobiao Mo <guobiaomo@chinamobile.com>
Tue, 30 Apr 2019 03:59:56 +0000 (20:59 -0700)
Issue-ID: DCAEGEN2-1200
Change-Id: Ia18993524c272c42ba48485a636f04f94af0cd0a
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
19 files changed:
components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/MongodbService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
components/datalake-handler/feeder/src/main/resources/application.properties
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/controller/TopicControllerTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java [new file with mode: 0644]
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/CouchbaseServiceTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/ElasticsearchServiceTest.java
components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/MongodbServiceTest.java

index 44f4ef1..e201242 100644 (file)
@@ -11,10 +11,7 @@ CREATE TABLE `topic` (
   `save_raw` bit(1) DEFAULT NULL,\r
   `ttl` int(11) DEFAULT NULL,\r
   `data_format` varchar(255) DEFAULT NULL,\r
-  `default_topic` varchar(255) DEFAULT NULL,\r
-  PRIMARY KEY (`name`),\r
-  KEY `FK_default_topic` (`default_topic`),\r
-  CONSTRAINT `FK_default_topic` FOREIGN KEY (`default_topic`) REFERENCES `topic` (`name`)\r
+  PRIMARY KEY (`name`)\r
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;\r
 \r
 \r
@@ -52,7 +49,7 @@ insert into db (`name`,`host`) values ('Druid','dl_druid');
 -- in production, default enabled should be off\r
 insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');\r
 insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);\r
-insert into `topic`(`name`,correlate_cleared_message,`enabled`,default_topic, message_id_path) values ('unauthenticated.SEC_FAULT_OUTPUT',1,0,'_DL_DEFAULT_', '/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem');\r
+insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path) values ('unauthenticated.SEC_FAULT_OUTPUT',1,0,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem');\r
 \r
 \r
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_');\r
index a3add0e..d59c0fc 100644 (file)
@@ -45,6 +45,7 @@ public class ApplicationConfiguration {
        private String dmaapKafkaHostPort;
        private String dmaapKafkaGroup;
        private long dmaapKafkaTimeout;
+       private String[] dmaapKafkaExclude;
 
        private int dmaapCheckNewTopicIntervalInSec;
 
index f08a994..d3a1fce 100644 (file)
@@ -55,10 +55,10 @@ import io.swagger.annotations.ApiOperation;
 /**
  * This controller manages topic settings.
  * 
- * Topic "_DL_DEFAULT_" acts as the default. For example, if a topic's
- * enabled=null, _DL_DEFAULT_.enabled is used for that topic. All the settings
- * are saved in database. topic "_DL_DEFAULT_" is populated at setup by a DB
- * script.
+ * Topic "_DL_DEFAULT_" acts as the default. 
+ * If a topic is not present in database, "_DL_DEFAULT_" is used for it.
+ * If a topic is present in database, itself should be complete, and no setting from "_DL_DEFAULT_" is used.
+ * Topic "_DL_DEFAULT_" is populated at setup by a DB script.
  * 
  * @author Guobiao Mo
  * @contributor Kate Hsuan @ QCT
@@ -88,7 +88,7 @@ public class TopicController {
 
        @GetMapping("")
        @ResponseBody
-       @ApiOperation(value="List all topics in database")
+       @ApiOperation(value="List all topic names in database")
        public List<String> list() {
                Iterable<Topic> ret = topicRepository.findAll();
                List<String> retString = new ArrayList<>();
@@ -114,13 +114,11 @@ public class TopicController {
                        sendError(response, 400, "Topic already exists "+topicConfig.getName());
                        return null;
                } else {
-                       PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
                        Topic wTopic = topicService.fillTopicConfiguration(topicConfig);
                        if(wTopic.getTtl() == 0)
                                wTopic.setTtl(3650);
-                       topicRepository.save(wTopic);
-                       mkPostReturnBody(retBody, 200, wTopic);
-                       return retBody;
+                       topicRepository.save(wTopic); 
+                       return mkPostReturnBody(200, wTopic);
                }
        }
 
@@ -159,18 +157,19 @@ public class TopicController {
                        sendError(response, 404, "Topic not found "+topicConfig.getName());
                        return null;
                } else {
-                       PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
                        topicService.fillTopicConfiguration(topicConfig, oldTopic);
                        topicRepository.save(oldTopic);
-                       mkPostReturnBody(retBody, 200, oldTopic);
-                       return retBody;
+                       return mkPostReturnBody(200, oldTopic);
                }
        }
 
-       private void mkPostReturnBody(PostReturnBody<TopicConfig> retBody, int statusCode, Topic topic)
+       private PostReturnBody<TopicConfig> mkPostReturnBody(int statusCode, Topic topic)
        {
+               PostReturnBody<TopicConfig> retBody = new PostReturnBody<>();
         retBody.setStatusCode(statusCode);
         retBody.setReturnBody(topic.getTopicConfig());
+        
+        return retBody;
        }
        
        private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
index 06c6b8c..c618f57 100644 (file)
@@ -32,10 +32,7 @@ import javax.persistence.JoinTable;
 import javax.persistence.ManyToMany;
 import javax.persistence.Table;
 
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONObject;
 import org.onap.datalake.feeder.dto.TopicConfig;
-import org.onap.datalake.feeder.enumeration.DataFormat;
 
 import com.fasterxml.jackson.annotation.JsonBackReference;
 
@@ -54,11 +51,10 @@ import lombok.Setter;
 @Table(name = "topic")
 public class Topic {
        @Id
-       @Column(name="`name`")
+       @Column(name = "`name`")
        private String name;//topic name 
 
-
-               //for protected Kafka topics
+       //for protected Kafka topics
        @Column(name = "`login`")
        private String login;
 
@@ -69,16 +65,13 @@ public class Topic {
        @JsonBackReference
        //@JsonManagedReference
        @ManyToMany(fetch = FetchType.EAGER)
-    @JoinTable(        name                            = "map_db_topic",
-                       joinColumns             = {  @JoinColumn(name="topic_name")  },
-                       inverseJoinColumns      = {  @JoinColumn(name="db_name")  }
-    )
+       @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_name") }, inverseJoinColumns = { @JoinColumn(name = "db_name") })
        protected Set<Db> dbs;
 
        /**
         * indicate if we should monitor this topic
         */
-       @Column(name="`enabled`")
+       @Column(name = "`enabled`")
        private Boolean enabled;
 
        /**
@@ -88,10 +81,10 @@ public class Topic {
        private Boolean saveRaw;
 
        /**
-        * need to explicitly tell feeder the data format of the message.
-        * support JSON, XML, YAML, TEXT
+        * need to explicitly tell feeder the data format of the message. support JSON,
+        * XML, YAML, TEXT
         */
-       @Column(name="`data_format`")
+       @Column(name = "`data_format`")
        private String dataFormat;
 
        /**
@@ -114,22 +107,6 @@ public class Topic {
                this.name = name;
        }
 
-       public Topic clone() {  //TODO will use TopicConfig
-               Topic ret = new Topic();
-               ret.setCorrelateClearedMessage(correlateClearedMessage);
-               ret.setDataFormat(dataFormat);
-               ret.setDbs(dbs);
-               ret.setEnabled(enabled);
-               ret.setLogin(login);
-               ret.setMessageIdPath(messageIdPath);
-               ret.setName(name);
-               ret.setPass(pass);
-               ret.setSaveRaw(saveRaw);
-               ret.setTtl(ttl);
-               
-               return ret;
-       }
-       
        public boolean isDefault() {
                return "_DL_DEFAULT_".equals(name);
        }
@@ -145,19 +122,11 @@ public class Topic {
        public int getTtl() {
                if (ttl != null) {
                        return ttl;
-               }  else {
+               } else {
                        return 3650;//default to 10 years for safe
                }
        }
 
-       public DataFormat getDataFormat() {
-               if (dataFormat != null) {
-                       return DataFormat.fromString(dataFormat);
-               }  else {
-                       return null;
-               }
-       }
-
        private boolean is(Boolean b) {
                return is(b, false);
        }
@@ -165,7 +134,7 @@ public class Topic {
        private boolean is(Boolean b, boolean defaultValue) {
                if (b != null) {
                        return b;
-               }  else {
+               } else {
                        return defaultValue;
                }
        }
@@ -174,71 +143,25 @@ public class Topic {
                return is(saveRaw);
        }
 
-       public boolean supportElasticsearch() {
-               return containDb("Elasticsearch");//TODO string hard codes
-       }
-
-       public boolean supportCouchbase() {
-               return containDb("Couchbase");
-       }
-
-       public boolean supportDruid() {
-               return containDb("Druid");
-       }
-
-       public boolean supportMongoDB() {
-               return containDb("MongoDB");
-       }
-
-       private boolean containDb(String dbName) {
-               Db db = new Db(dbName);
-
-               if (dbs != null && dbs.contains(db)) {
-                       return true;
-               } else {
-                       return false;
-               }
-       }
-
-       //extract DB id from JSON attributes, support multiple attributes
-       public String getMessageId(JSONObject json) {
-               String id = null;
-
-               if (StringUtils.isNotBlank(messageIdPath)) {
-                       String[] paths = messageIdPath.split(",");
-
-                       StringBuilder sb = new StringBuilder();
-                       for (int i = 0; i < paths.length; i++) {
-                               if (i > 0) {
-                                       sb.append('^');
-                               }
-                               sb.append(json.query(paths[i]).toString());
-                       }
-                       id = sb.toString();
-               }
-
-               return id;
-       }
-
        public TopicConfig getTopicConfig() {
                TopicConfig tConfig = new TopicConfig();
-               
+
                tConfig.setName(getName());
-               tConfig.setEnable(getEnabled());
-               if(getDataFormat() != null)
-                       tConfig.setDataFormat(getDataFormat().toString());
-               tConfig.setSaveRaw(getSaveRaw());
-               tConfig.setCorrelatedClearredMessage((getCorrelateClearedMessage() == null) ? getCorrelateClearedMessage() : false);
+               tConfig.setEnabled(isEnabled());
+               tConfig.setDataFormat(dataFormat);
+               tConfig.setSaveRaw(isSaveRaw());
+               tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
                tConfig.setMessageIdPath(getMessageIdPath());
                tConfig.setTtl(getTtl());
                Set<Db> topicDb = getDbs();
                List<String> dbList = new ArrayList<>();
-               for(Db item: topicDb)
-               {
-                       dbList.add(item.getName());
+               if (topicDb != null) {
+                       for (Db item : topicDb) {
+                               dbList.add(item.getName());
+                       }
                }
                tConfig.setSinkdbs(dbList);
-               
+
                return tConfig;
        }
 
index 76b41cb..15ffc8a 100644 (file)
@@ -22,14 +22,12 @@ package org.onap.datalake.feeder.dto;
 
 import lombok.Getter;
 import lombok.Setter;
-import org.onap.datalake.feeder.domain.Topic;
-import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.repository.DbRepository;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONObject;
+import org.onap.datalake.feeder.enumeration.DataFormat;
 
 /**
  * JSON request body for Topic manipulation.
@@ -43,17 +41,85 @@ import java.util.Set;
 
 public class TopicConfig {
 
-    private String name;
-    private String login;
-    private String password;
-    private List<String> sinkdbs;
-    private boolean enable;
-    private boolean saveRaw;
-    private String dataFormat;
-    private int ttl;
-    private boolean correlatedClearredMessage;
-    private String messageIdPath;
+       private String name;
+       private String login;
+       private String password;
+       private List<String> sinkdbs;
+       private boolean enabled;
+       private boolean saveRaw;
+       private String dataFormat;
+       private int ttl;
+       private boolean correlateClearedMessage;
+       private String messageIdPath;
 
+       public DataFormat getDataFormat2() {
+               if (dataFormat != null) {
+                       return DataFormat.fromString(dataFormat);
+               } else {
+                       return null;
+               }
+       }
 
+       
+       public boolean supportElasticsearch() {
+               return containDb("Elasticsearch");//TODO string hard codes
+       }
+
+       public boolean supportCouchbase() {
+               return containDb("Couchbase");
+       }
+
+       public boolean supportDruid() {
+               return containDb("Druid");
+       }
+
+       public boolean supportMongoDB() {
+               return containDb("MongoDB");
+       }
+
+       private boolean containDb(String dbName) {
+               return (sinkdbs != null && sinkdbs.contains(dbName));
+       }
+
+       //extract DB id from JSON attributes, support multiple attributes
+       public String getMessageId(JSONObject json) {
+               String id = null;
+
+               if (StringUtils.isNotBlank(messageIdPath)) {
+                       String[] paths = messageIdPath.split(",");
+
+                       StringBuilder sb = new StringBuilder();
+                       for (int i = 0; i < paths.length; i++) {
+                               if (i > 0) {
+                                       sb.append('^');
+                               }
+                               sb.append(json.query(paths[i]).toString());
+                       }
+                       id = sb.toString();
+               }
+
+               return id;
+       }
+
+       @Override
+       public String toString() {
+               return name;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == null)
+                       return false;
+
+               if (this.getClass() != obj.getClass())
+                       return false;
+
+               return name.equals(((TopicConfig) obj).getName());
+       }
+
+       @Override
+       public int hashCode() {
+               return name.hashCode();
+       }
 
 }
index 1e5fb78..12d03ee 100644 (file)
@@ -30,6 +30,7 @@ import org.json.JSONObject;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
 import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,7 +91,7 @@ public class CouchbaseService {
                bucket.close();
        } 
 
-       public void saveJsons(Topic topic, List<JSONObject> jsons) { 
+       public void saveJsons(TopicConfig topic, List<JSONObject> jsons) { 
                List<JsonDocument> documents= new ArrayList<>(jsons.size());
                for(JSONObject json : jsons) {
                        //convert to Couchbase JsonObject from org.json JSONObject
@@ -109,7 +110,7 @@ public class CouchbaseService {
                log.debug("saved text to topic = {}, this batch count = {} ", topic, documents.size()); 
        }
 
-       public String getId(Topic topic, JSONObject json) {
+       public String getId(TopicConfig topic, JSONObject json) {
                //if this topic requires extract id from JSON
                String id = topic.getMessageId(json);
                if(id != null) {
index 270db93..de8c9e8 100644 (file)
@@ -25,13 +25,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import javax.annotation.PostConstruct;
-
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
-import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -54,12 +52,6 @@ public class DmaapService {
        @Autowired
        private TopicService topicService;
 
-       
-       @PostConstruct
-       private void init() {           
-
-       }
-
        //get all topic names from Zookeeper
        public List<String> getTopics() {
                try {
@@ -67,28 +59,32 @@ public class DmaapService {
                                @Override
                                public void process(WatchedEvent event) {
                                        // TODO monitor new topics
-                                       
+
                                }
-               };
+                       };
                        ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher);
-               List<String> topics = zk.getChildren("/brokers/topics", false);
-               return topics;
+                       List<String> topics = zk.getChildren("/brokers/topics", false);
+                       String[] excludes = config.getDmaapKafkaExclude();
+                       for (String exclude : excludes) {
+                               topics.remove(exclude);
+                       }
+                       return topics;
                } catch (Exception e) {
                        log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e);
-                       return null;
+                       return Collections.emptyList();
                }
-       }       
+       }
 
-       public List<String> getActiveTopics() throws IOException {               
+       public List<String> getActiveTopics() throws IOException {
                List<String> allTopics = getTopics();
-               if(allTopics == null) {
+               if (allTopics == null) {
                        return Collections.emptyList();
                }
 
                List<String> ret = new ArrayList<>();
                for (String topicStr : allTopics) {
-                       Topic topic = topicService.getEffectiveTopic(topicStr, true);
-                       if (topic.isEnabled()) {
+                       TopicConfig topicConfig = topicService.getEffectiveTopic(topicStr, true);
+                       if (topicConfig.isEnabled()) {
                                ret.add(topicStr);
                        }
                }
index 30aa733..4090e7e 100644 (file)
@@ -47,7 +47,7 @@ import org.elasticsearch.rest.RestStatus;
 import org.json.JSONObject;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,6 +112,7 @@ public class ElasticsearchService {
                
                boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
                if(!exists){
+                       //TODO submit mapping template
                        CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); 
                        CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);          
                        log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged());
@@ -119,7 +120,7 @@ public class ElasticsearchService {
        }
        
        //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
-       public void saveJsons(Topic topic, List<JSONObject> jsons) {
+       public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
                BulkRequest request = new BulkRequest();
 
                for (JSONObject json : jsons) {
@@ -134,6 +135,9 @@ public class ElasticsearchService {
 
                        request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
                }
+
+               log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size());
+               
                if(config.isAsync()) {
                        client.bulkAsync(request, RequestOptions.DEFAULT, listener);                    
                }else {
@@ -155,7 +159,7 @@ public class ElasticsearchService {
         * The search API can only query all data or based on the fields in the source.
         * So use the get API, three parameters: index, type, document id
         */
-       private boolean correlateClearedMessage(Topic topic, JSONObject json) {
+       private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) {
                boolean found = false;
                String eName = null;
 
index fb3f806..02c80a4 100644 (file)
@@ -34,7 +34,7 @@ import org.bson.Document;
 import org.json.JSONObject;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +48,6 @@ import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
-import com.mongodb.DB;
 
 /**
  * Service for using MongoDB
@@ -131,7 +130,7 @@ public class MongodbService {
                mongoClient.close();
        }
 
-       public void saveJsons(Topic topic, List<JSONObject> jsons) {
+       public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
                if(dbReady == false)
                        return;
                List<Document> documents = new ArrayList<>(jsons.size());
@@ -150,7 +149,7 @@ public class MongodbService {
                MongoCollection<Document> collection = mongoCollectionMap.computeIfAbsent(collectionName, k -> database.getCollection(k));
                collection.insertMany(documents);
 
-               log.debug("saved text to topic = {}, topic total count = {} ", topic, collection.countDocuments());
+               log.debug("saved text to topic = {}, batch count = {} ", topic, jsons.size());
        }
 
 }
index ce671a9..b3a6d29 100644 (file)
@@ -82,6 +82,7 @@ public class PullThread implements Runnable {
        private void init() {
                async = config.isAsync();
                Properties consumerConfig = getConsumerConfig();
+               log.info("Kafka ConsumerConfig: {}", consumerConfig);
                consumer = new KafkaConsumer<>(consumerConfig);
        }
 
index a4f7910..449dacf 100644 (file)
@@ -36,6 +36,7 @@ import org.json.JSONObject;
 import org.json.XML;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.dto.TopicConfig;
 import org.onap.datalake.feeder.enumeration.DataFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,7 +77,7 @@ public class StoreService {
        @Autowired
        private ElasticsearchService elasticsearchService;
 
-       private Map<String, Topic> topicMap = new HashMap<>();
+       private Map<String, TopicConfig> topicMap = new HashMap<>();
 
        private ObjectMapper yamlReader;
 
@@ -94,7 +95,7 @@ public class StoreService {
                        return;
                }
 
-               Topic topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically
+               TopicConfig topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically
                        return topicService.getEffectiveTopic(topicStr);
                });
 
@@ -111,7 +112,7 @@ public class StoreService {
                saveJsons(topic, docs);
        }
 
-       private JSONObject messageToJson(Topic topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
+       private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
 
                long timestamp = pair.getLeft();
                String text = pair.getRight();
@@ -126,7 +127,7 @@ public class StoreService {
 
                JSONObject json = null;
 
-               DataFormat dataFormat = topic.getDataFormat();
+               DataFormat dataFormat = topic.getDataFormat2();
 
                switch (dataFormat) {
                case JSON:
@@ -160,7 +161,7 @@ public class StoreService {
                return json;
        }
 
-       private void saveJsons(Topic topic, List<JSONObject> jsons) {
+       private void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
                if (topic.supportMongoDB()) {
                        mongodbService.saveJsons(topic, jsons);
                }
index 7ae3ff7..f0b000b 100644 (file)
@@ -60,7 +60,7 @@ public class TopicService {
        @Autowired
        private DbRepository dbRepository;
 
-       public Topic getEffectiveTopic(String topicStr) {
+       public TopicConfig getEffectiveTopic(String topicStr) {
                try {
                        return getEffectiveTopic(topicStr, false);
                } catch (IOException e) {
@@ -69,17 +69,18 @@ public class TopicService {
                return null;
        }
 
-       public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
+       public TopicConfig getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
                Topic topic = getTopic(topicStr);
                if (topic == null) {
-                       topic = getDefaultTopic().clone();
-                       topic.setName(topicStr);
+                       topic = getDefaultTopic();
                }
+               TopicConfig topicConfig = topic.getTopicConfig();
+               topicConfig.setName(topicStr);//need to change name if it comes from DefaultTopic
                
-               if(ensureTableExist && topic.isEnabled() && topic.supportElasticsearch()) { 
+               if(ensureTableExist && topicConfig.isEnabled() && topicConfig.supportElasticsearch()) {
                        elasticsearchService.ensureTableExist(topicStr); 
                }
-               return topic;
+               return topicConfig;
        }
 
        public Topic getTopic(String topicStr) {
@@ -116,10 +117,10 @@ public class TopicService {
                topic.setName(tConfig.getName());
                topic.setLogin(tConfig.getLogin());
                topic.setPass(tConfig.getPassword());
-               topic.setEnabled(tConfig.isEnable());
+               topic.setEnabled(tConfig.isEnabled());
                topic.setSaveRaw(tConfig.isSaveRaw());
                topic.setTtl(tConfig.getTtl());
-               topic.setCorrelateClearedMessage(tConfig.isCorrelatedClearredMessage());
+               topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage());
                topic.setDataFormat(tConfig.getDataFormat());
                topic.setMessageIdPath(tConfig.getMessageIdPath());
 
index dfe48a2..e1b8399 100644 (file)
@@ -27,6 +27,8 @@ dmaapZookeeperHostPort=message-router-zookeeper:2181
 dmaapKafkaHostPort=message-router-kafka:9092
 dmaapKafkaGroup=dlgroup10
 dmaapKafkaTimeout=60
+dmaapKafkaExclude[0]=__consumer_offsets
+dmaapKafkaExclude[1]=msgrtr.apinode.metrics.dmaap
 #check for new topics 
 dmaapCheckNewTopicIntervalInSec=3000
 
index 7c2bf91..e96d940 100644 (file)
@@ -136,12 +136,12 @@ public class TopicControllerTest {
         when(topicRepository.findById("a")).thenReturn(Optional.of(a));
         TopicConfig ac = new TopicConfig();
         ac.setName("a");
-        ac.setEnable(true);
+        ac.setEnabled(true);
         PostReturnBody<TopicConfig> postConfig1 = topicController.updateTopic("a", ac, mockBindingResult, httpServletResponse);
         assertEquals(200, postConfig1.getStatusCode());
         TopicConfig ret = postConfig1.getReturnBody();
         assertEquals("a", ret.getName());
-        assertEquals(true, ret.isEnable());
+        assertEquals(true, ret.isEnabled());
         when(mockBindingResult.hasErrors()).thenReturn(true);
         PostReturnBody<TopicConfig> postConfig2 = topicController.updateTopic("a", ac, mockBindingResult, httpServletResponse);
         assertEquals(null, postConfig2);
index b583473..74f0884 100644 (file)
@@ -19,7 +19,6 @@
  */
 package org.onap.datalake.feeder.domain;
 
-import org.json.JSONObject;
 import org.junit.Test;
 import org.onap.datalake.feeder.enumeration.DataFormat;
 
@@ -27,7 +26,6 @@ import java.util.HashSet;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -38,35 +36,9 @@ import static org.junit.Assert.assertTrue;
 
 public class TopicTest {
 
-    @Test
-    public void getMessageId() {
-        String text = "{ data: { data2 : { value : 'hello'}}}";
-
-        JSONObject json = new JSONObject(text);
-
-        Topic topic = new Topic("test getMessageId");
-        topic.setMessageIdPath("/data/data2/value");
-
-        String value = topic.getMessageId(json);
-
-        assertEquals(value, "hello");
-    }
-
     @Test
     public void getMessageIdFromMultipleAttributes() {
-        String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}";
-
-        JSONObject json = new JSONObject(text);
-
-        Topic topic = new Topic("test getMessageId");
-        topic.setMessageIdPath("/data/data2/value,/data/data3");
-
-        String value = topic.getMessageId(json);
-        assertEquals(value, "hello^world");
-
-        topic.setMessageIdPath("");
-        assertNull(topic.getMessageId(json));
-
+        Topic topic = new Topic("test getMessageId"); 
         Topic defaultTopic = new Topic("_DL_DEFAULT_");
         Topic testTopic = new Topic("test");
 
@@ -99,13 +71,6 @@ public class TopicTest {
 
         defaultTopic.setDbs(new HashSet<>());
         defaultTopic.getDbs().add(new Db("Elasticsearch"));
-        assertTrue(defaultTopic.supportElasticsearch());
-        assertFalse(testTopic.supportCouchbase());
-        assertFalse(testTopic.supportDruid());
-        assertFalse(testTopic.supportMongoDB());
-
-        defaultTopic.getDbs().remove(new Db("Elasticsearch"));
-        assertFalse(testTopic.supportElasticsearch());
 
         assertEquals(defaultTopic.getDataFormat(), null);
         defaultTopic.setCorrelateClearedMessage(true);
@@ -116,7 +81,7 @@ public class TopicTest {
         assertTrue(defaultTopic.isEnabled());
         assertTrue(defaultTopic.isSaveRaw());
 
-        assertEquals(defaultTopic.getDataFormat(), DataFormat.XML);
+        assertEquals(defaultTopic.getTopicConfig().getDataFormat2(), DataFormat.XML);
 
         defaultTopic.setDataFormat(null);
         assertEquals(testTopic.getDataFormat(), null);
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
new file mode 100644 (file)
index 0000000..c65e920
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.datalake.feeder.dto;
+
+import org.json.JSONObject;
+import org.junit.Test;
+import org.onap.datalake.feeder.domain.Db;
+import org.onap.datalake.feeder.domain.Topic;
+
+import java.util.HashSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Topic
+ *
+ * @author Guobiao Mo
+ */
+
+public class TopicConfigTest {
+
+    @Test
+    public void getMessageId() {
+        String text = "{ data: { data2 : { value : 'hello'}}}";
+
+        JSONObject json = new JSONObject(text);
+
+        Topic topic = new Topic("test getMessageId");
+        topic.setMessageIdPath("/data/data2/value");
+        
+        TopicConfig topicConfig = topic.getTopicConfig();
+
+        String value = topicConfig.getMessageId(json);
+
+        assertEquals(value, "hello");
+    }
+
+    @Test
+    public void getMessageIdFromMultipleAttributes() {
+        String text = "{ data: { data2 : { value : 'hello'}, data3 : 'world'}}";
+
+        JSONObject json = new JSONObject(text);
+
+        Topic topic = new Topic("test getMessageId");
+        topic.setMessageIdPath("/data/data2/value,/data/data3");
+
+        TopicConfig topicConfig = topic.getTopicConfig();
+        
+        String value = topicConfig.getMessageId(json);
+        assertEquals(value, "hello^world");
+
+        topic.setMessageIdPath("");
+        topicConfig = topic.getTopicConfig();
+        assertNull(topicConfig.getMessageId(json));
+
+    }
+
+    @Test
+    public void testIs() {
+        Topic testTopic = new Topic("test");
+
+        assertTrue(testTopic.equals(new Topic("test")));
+        assertEquals(testTopic.hashCode(), (new Topic("test")).hashCode());
+
+        testTopic.setDbs(new HashSet<>());
+        testTopic.getDbs().add(new Db("Elasticsearch"));
+        
+        TopicConfig testTopicConfig = testTopic.getTopicConfig();
+        
+        assertTrue(testTopicConfig.supportElasticsearch());
+        assertFalse(testTopicConfig.supportCouchbase());
+        assertFalse(testTopicConfig.supportDruid());
+        assertFalse(testTopicConfig.supportMongoDB());
+
+        testTopic.getDbs().remove(new Db("Elasticsearch"));
+        testTopicConfig = testTopic.getTopicConfig();
+        assertFalse(testTopicConfig.supportElasticsearch());
+    }
+}
index 9e1b2d9..0efde44 100755 (executable)
@@ -114,7 +114,7 @@ public class CouchbaseServiceTest {
         CouchbaseService couchbaseService = new CouchbaseService();
         couchbaseService.bucket = bucket;
         couchbaseService.config = appConfig;
-        couchbaseService.saveJsons(topic, jsons);
+        couchbaseService.saveJsons(topic.getTopicConfig(), jsons);
 
     }
 
@@ -134,7 +134,7 @@ public class CouchbaseServiceTest {
         CouchbaseService couchbaseService = new CouchbaseService();
         couchbaseService.bucket = bucket;
         couchbaseService.config = appConfig;
-        couchbaseService.saveJsons(topic, jsons);
+        couchbaseService.saveJsons(topic.getTopicConfig(), jsons);
     }
 
     @Test
index de2c167..9590b0a 100644 (file)
@@ -89,7 +89,7 @@ public class ElasticsearchServiceTest {
         when(config.getElasticsearchType()).thenReturn("doc");
         when(config.isAsync()).thenReturn(true);
 
-        elasticsearchService.saveJsons(topic, jsons);
+        elasticsearchService.saveJsons(topic.getTopicConfig(), jsons);
 
     }
 }
\ No newline at end of file
index 4185676..016381b 100644 (file)
@@ -83,6 +83,6 @@ public class MongodbServiceTest {
         jsons.add(jsonObject);
         jsons.add(jsonObject2);
 
-        mongodbService.saveJsons(topic, jsons);
+        mongodbService.saveJsons(topic.getTopicConfig(), jsons);
     }
 }
\ No newline at end of file