2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.datalake.feeder.domain;
22 import java.util.ArrayList;
23 import java.util.List;
26 import javax.persistence.Column;
27 import javax.persistence.Entity;
28 import javax.persistence.FetchType;
29 import javax.persistence.Id;
30 import javax.persistence.JoinColumn;
31 import javax.persistence.JoinTable;
32 import javax.persistence.ManyToMany;
33 import javax.persistence.Table;
35 import org.apache.commons.lang3.StringUtils;
36 import org.json.JSONObject;
37 import org.onap.datalake.feeder.dto.TopicConfig;
38 import org.onap.datalake.feeder.enumeration.DataFormat;
40 import com.fasterxml.jackson.annotation.JsonBackReference;
46 * Domain class representing topic
54 @Table(name = "topic")
57 @Column(name="`name`")
58 private String name;//topic name
61 //for protected Kafka topics
62 @Column(name = "`login`")
65 @Column(name = "`pass`")
68 //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
70 //@JsonManagedReference
71 @ManyToMany(fetch = FetchType.EAGER)
72 @JoinTable( name = "map_db_topic",
73 joinColumns = { @JoinColumn(name="topic_name") },
74 inverseJoinColumns = { @JoinColumn(name="db_name") }
76 protected Set<Db> dbs;
79 * indicate if we should monitor this topic
81 @Column(name="`enabled`")
82 private Boolean enabled;
85 * save raw message text
87 @Column(name = "`save_raw`")
88 private Boolean saveRaw;
91 * need to explicitly tell feeder the data format of the message.
92 * support JSON, XML, YAML, TEXT
94 @Column(name="`data_format`")
95 private String dataFormat;
102 //if this flag is true, need to correlate alarm cleared message to previous alarm
103 @Column(name = "`correlate_cleared_message`")
104 private Boolean correlateClearedMessage;
106 //paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name"
107 @Column(name = "`message_id_path`")
108 private String messageIdPath;
113 public Topic(String name) {
117 public Topic clone() { //TODO will use TopicConfig
118 Topic ret = new Topic();
119 ret.setCorrelateClearedMessage(correlateClearedMessage);
120 ret.setDataFormat(dataFormat);
122 ret.setEnabled(enabled);
124 ret.setMessageIdPath(messageIdPath);
127 ret.setSaveRaw(saveRaw);
133 public boolean isDefault() {
134 return "_DL_DEFAULT_".equals(name);
137 public boolean isEnabled() {
141 public boolean isCorrelateClearedMessage() {
142 return is(correlateClearedMessage);
145 public int getTtl() {
149 return 3650;//default to 10 years for safe
153 public DataFormat getDataFormat() {
154 if (dataFormat != null) {
155 return DataFormat.fromString(dataFormat);
161 private boolean is(Boolean b) {
165 private boolean is(Boolean b, boolean defaultValue) {
173 public boolean isSaveRaw() {
177 public boolean supportElasticsearch() {
178 return containDb("Elasticsearch");//TODO string hard codes
181 public boolean supportCouchbase() {
182 return containDb("Couchbase");
185 public boolean supportDruid() {
186 return containDb("Druid");
189 public boolean supportMongoDB() {
190 return containDb("MongoDB");
193 private boolean containDb(String dbName) {
194 Db db = new Db(dbName);
196 if (dbs != null && dbs.contains(db)) {
203 //extract DB id from JSON attributes, support multiple attributes
204 public String getMessageId(JSONObject json) {
207 if (StringUtils.isNotBlank(messageIdPath)) {
208 String[] paths = messageIdPath.split(",");
210 StringBuilder sb = new StringBuilder();
211 for (int i = 0; i < paths.length; i++) {
215 sb.append(json.query(paths[i]).toString());
223 public TopicConfig getTopicConfig() {
224 TopicConfig tConfig = new TopicConfig();
226 tConfig.setName(getName());
227 tConfig.setEnable(getEnabled());
228 if(getDataFormat() != null)
229 tConfig.setDataFormat(getDataFormat().toString());
230 tConfig.setSaveRaw(getSaveRaw());
231 tConfig.setCorrelatedClearredMessage((getCorrelateClearedMessage() == null) ? getCorrelateClearedMessage() : false);
232 tConfig.setMessageIdPath(getMessageIdPath());
233 tConfig.setTtl(getTtl());
234 Set<Db> topicDb = getDbs();
235 List<String> dbList = new ArrayList<>();
236 for(Db item: topicDb)
238 dbList.add(item.getName());
240 tConfig.setSinkdbs(dbList);
246 public String toString() {
251 public boolean equals(Object obj) {
255 if (this.getClass() != obj.getClass())
258 return name.equals(((Topic) obj).getName());
262 public int hashCode() {
263 return name.hashCode();