Merge "Modify TopicConfig"
[dcaegen2/services.git] / components / datalake-handler / feeder / src / main / java / org / onap / datalake / feeder / domain / Topic.java
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DataLake
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20 package org.onap.datalake.feeder.domain;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Set;
25
26 import javax.persistence.Column;
27 import javax.persistence.Entity;
28 import javax.persistence.FetchType;
29 import javax.persistence.Id;
30 import javax.persistence.JoinColumn;
31 import javax.persistence.JoinTable;
32 import javax.persistence.ManyToMany;
33 import javax.persistence.Table;
34
35 import org.apache.commons.lang3.StringUtils;
36 import org.json.JSONObject;
37 import org.onap.datalake.feeder.dto.TopicConfig;
38 import org.onap.datalake.feeder.enumeration.DataFormat;
39
40 import com.fasterxml.jackson.annotation.JsonBackReference;
41
42 import lombok.Getter;
43 import lombok.Setter;
44
45 /**
46  * Domain class representing topic
47  * 
48  * @author Guobiao Mo
49  *
50  */
51 @Setter
52 @Getter
53 @Entity
54 @Table(name = "topic")
55 public class Topic {
56         @Id
57         @Column(name="`name`")
58         private String name;//topic name 
59
60
61                 //for protected Kafka topics
62         @Column(name = "`login`")
63         private String login;
64
65         @Column(name = "`pass`")
66         private String pass;
67
68         //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
69         @JsonBackReference
70         //@JsonManagedReference
71         @ManyToMany(fetch = FetchType.EAGER)
72     @JoinTable( name                            = "map_db_topic",
73                         joinColumns             = {  @JoinColumn(name="topic_name")  },
74                         inverseJoinColumns      = {  @JoinColumn(name="db_name")  }
75     )
76         protected Set<Db> dbs;
77
78         /**
79          * indicate if we should monitor this topic
80          */
81         @Column(name="`enabled`")
82         private Boolean enabled;
83
84         /**
85          * save raw message text
86          */
87         @Column(name = "`save_raw`")
88         private Boolean saveRaw;
89
90         /**
91          * need to explicitly tell feeder the data format of the message.
92          * support JSON, XML, YAML, TEXT
93          */
94         @Column(name="`data_format`")
95         private String dataFormat;
96
97         /**
98          * TTL in day
99          */
100         private Integer ttl;
101
102         //if this flag is true, need to correlate alarm cleared message to previous alarm 
103         @Column(name = "`correlate_cleared_message`")
104         private Boolean correlateClearedMessage;
105
106         //paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name"
107         @Column(name = "`message_id_path`")
108         private String messageIdPath;
109
110         public Topic() {
111         }
112
113         public Topic(String name) {
114                 this.name = name;
115         }
116
117         public Topic clone() {  //TODO will use TopicConfig
118                 Topic ret = new Topic();
119                 ret.setCorrelateClearedMessage(correlateClearedMessage);
120                 ret.setDataFormat(dataFormat);
121                 ret.setDbs(dbs);
122                 ret.setEnabled(enabled);
123                 ret.setLogin(login);
124                 ret.setMessageIdPath(messageIdPath);
125                 ret.setName(name);
126                 ret.setPass(pass);
127                 ret.setSaveRaw(saveRaw);
128                 ret.setTtl(ttl);
129                 
130                 return ret;
131         }
132         
133         public boolean isDefault() {
134                 return "_DL_DEFAULT_".equals(name);
135         }
136
137         public boolean isEnabled() {
138                 return is(enabled);
139         }
140
141         public boolean isCorrelateClearedMessage() {
142                 return is(correlateClearedMessage);
143         }
144
145         public int getTtl() {
146                 if (ttl != null) {
147                         return ttl;
148                 }  else {
149                         return 3650;//default to 10 years for safe
150                 }
151         }
152
153         public DataFormat getDataFormat() {
154                 if (dataFormat != null) {
155                         return DataFormat.fromString(dataFormat);
156                 }  else {
157                         return null;
158                 }
159         }
160
161         private boolean is(Boolean b) {
162                 return is(b, false);
163         }
164
165         private boolean is(Boolean b, boolean defaultValue) {
166                 if (b != null) {
167                         return b;
168                 }  else {
169                         return defaultValue;
170                 }
171         }
172
173         public boolean isSaveRaw() {
174                 return is(saveRaw);
175         }
176
177         public boolean supportElasticsearch() {
178                 return containDb("Elasticsearch");//TODO string hard codes
179         }
180
181         public boolean supportCouchbase() {
182                 return containDb("Couchbase");
183         }
184
185         public boolean supportDruid() {
186                 return containDb("Druid");
187         }
188
189         public boolean supportMongoDB() {
190                 return containDb("MongoDB");
191         }
192
193         private boolean containDb(String dbName) {
194                 Db db = new Db(dbName);
195
196                 if (dbs != null && dbs.contains(db)) {
197                         return true;
198                 } else {
199                         return false;
200                 }
201         }
202
203         //extract DB id from JSON attributes, support multiple attributes
204         public String getMessageId(JSONObject json) {
205                 String id = null;
206
207                 if (StringUtils.isNotBlank(messageIdPath)) {
208                         String[] paths = messageIdPath.split(",");
209
210                         StringBuilder sb = new StringBuilder();
211                         for (int i = 0; i < paths.length; i++) {
212                                 if (i > 0) {
213                                         sb.append('^');
214                                 }
215                                 sb.append(json.query(paths[i]).toString());
216                         }
217                         id = sb.toString();
218                 }
219
220                 return id;
221         }
222
223         public TopicConfig getTopicConfig() {
224                 TopicConfig tConfig = new TopicConfig();
225                 
226                 tConfig.setName(getName());
227                 tConfig.setEnable(getEnabled());
228                 if(getDataFormat() != null)
229                         tConfig.setDataFormat(getDataFormat().toString());
230                 tConfig.setSaveRaw(getSaveRaw());
231                 tConfig.setCorrelatedClearredMessage((getCorrelateClearedMessage() == null) ? getCorrelateClearedMessage() : false);
232                 tConfig.setMessageIdPath(getMessageIdPath());
233                 tConfig.setTtl(getTtl());
234                 Set<Db> topicDb = getDbs();
235                 List<String> dbList = new ArrayList<>();
236                 for(Db item: topicDb)
237                 {
238                         dbList.add(item.getName());
239                 }
240                 tConfig.setSinkdbs(dbList);
241                 
242                 return tConfig;
243         }
244
245         @Override
246         public String toString() {
247                 return name;
248         }
249
250         @Override
251         public boolean equals(Object obj) {
252                 if (obj == null)
253                         return false;
254
255                 if (this.getClass() != obj.getClass())
256                         return false;
257
258                 return name.equals(((Topic) obj).getName());
259         }
260
261         @Override
262         public int hashCode() {
263                 return name.hashCode();
264         }
265
266 }