Check Db enabled flag
[dcaegen2/services.git] / components / datalake-handler / feeder / src / main / java / org / onap / datalake / feeder / dto / TopicConfig.java
index 76b41cb..70778bb 100644 (file)
@@ -22,14 +22,12 @@ package org.onap.datalake.feeder.dto;
 
 import lombok.Getter;
 import lombok.Setter;
-import org.onap.datalake.feeder.domain.Topic;
-import org.onap.datalake.feeder.domain.Db;
-import org.onap.datalake.feeder.repository.DbRepository;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONObject;
+import org.onap.datalake.feeder.enumeration.DataFormat;
 
 /**
  * JSON request body for Topic manipulation.
@@ -43,17 +41,111 @@ import java.util.Set;
 
 public class TopicConfig {
 
-    private String name;
-    private String login;
-    private String password;
-    private List<String> sinkdbs;
-    private boolean enable;
-    private boolean saveRaw;
-    private String dataFormat;
-    private int ttl;
-    private boolean correlatedClearredMessage;
-    private String messageIdPath;
+       private String name;
+       private String login;
+       private String password;
+       private List<String> sinkdbs;
+       private List<String> enabledSinkdbs;//only include enabled db
+       private boolean enabled;
+       private boolean saveRaw;
+       private String dataFormat;
+       private int ttl;
+       private boolean correlateClearedMessage;
+       private String messageIdPath;
+       private String aggregateArrayPath;
+       private String flattenArrayPath;
 
+       public DataFormat getDataFormat2() {
+               if (dataFormat != null) {
+                       return DataFormat.fromString(dataFormat);
+               } else {
+                       return null;
+               }
+       }
 
+       public boolean supportHdfs() {
+               return supportDb("HDFS");
+       }
+
+       public boolean supportElasticsearch() {
+               return supportDb("Elasticsearch");//TODO string hard codes
+       }
+
+       public boolean supportCouchbase() {
+               return supportDb("Couchbase");
+       }
+
+       public boolean supportDruid() {
+               return supportDb("Druid");
+       }
+
+       public boolean supportMongoDB() {
+               return supportDb("MongoDB");
+       }
+
+       private boolean supportDb(String dbName) {
+               return (enabledSinkdbs != null && enabledSinkdbs.contains(dbName));
+       }
+
+       //extract DB id from JSON attributes, support multiple attributes
+       public String getMessageId(JSONObject json) {
+               String id = null;
+
+               if (StringUtils.isNotBlank(messageIdPath)) {
+                       String[] paths = messageIdPath.split(",");
+
+                       StringBuilder sb = new StringBuilder();
+                       for (int i = 0; i < paths.length; i++) {
+                               if (i > 0) {
+                                       sb.append('^');
+                               }
+                               sb.append(json.query(paths[i]).toString());
+                       }
+                       id = sb.toString();
+               }
+
+               return id;
+       }
+
+       public String[] getAggregateArrayPath2() {
+               String[] ret = null;
+
+               if (StringUtils.isNotBlank(aggregateArrayPath)) {
+                       ret = aggregateArrayPath.split(",");
+               }
+
+               return ret;
+       }
+
+       public String[] getFlattenArrayPath2() {
+               String[] ret = null;
+
+               if (StringUtils.isNotBlank(flattenArrayPath)) {
+                       ret = flattenArrayPath.split(",");
+               }
+
+               return ret;
+       }
+
+       @Override
+       public String toString() {
+               return String.format("Topic %s(enabled=%s, enabledSinkdbs=%s)", name, enabled, enabledSinkdbs);
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (obj == null)
+                       return false;
+
+               if (this.getClass() != obj.getClass())
+                       return false;
+
+               return name.equals(((TopicConfig) obj).getName());
+       }
+
+       @Override
+       public int hashCode() {
+               return name.hashCode();
+       }
 
 }