supports multiple Kafka clusters and DBs
[dcaegen2/services.git] / components / datalake-handler / feeder / src / main / java / org / onap / datalake / feeder / domain / Topic.java
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DataLake
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20 package org.onap.datalake.feeder.domain;
21
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Set;
26
27 import javax.persistence.Column;
28 import javax.persistence.Entity;
29 import javax.persistence.FetchType;
30 import javax.persistence.Id;
31 import javax.persistence.JoinColumn;
32 import javax.persistence.JoinTable;
33 import javax.persistence.ManyToMany;
34 import javax.persistence.ManyToOne;
35 import javax.persistence.Table;
36
37 import org.apache.commons.lang3.StringUtils;
38 import org.json.JSONObject;
39 import org.onap.datalake.feeder.dto.TopicConfig;
40 import org.onap.datalake.feeder.enumeration.DataFormat;
41
42 import com.fasterxml.jackson.annotation.JsonBackReference;
43
44 import lombok.Getter;
45 import lombok.Setter;
46
47 /**
48  * Domain class representing topic
49  * 
50  * @author Guobiao Mo
51  *
52  */
53 @Setter
54 @Getter
55 @Entity
56 @Table(name = "topic")
57 public class Topic {
58         @Id
59     @Column(name = "`id`")
60     private Integer id;
61
62         @ManyToOne(fetch = FetchType.EAGER)
63     @JoinColumn(name = "topic_name_id", nullable = false)
64         private TopicName topicName;//topic name 
65         
66         //for protected Kafka topics
67         @Column(name = "`login`")
68         private String login;
69
70         @Column(name = "`pass`")
71         private String pass;
72
73         //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
74         @JsonBackReference
75         //@JsonManagedReference
76         @ManyToMany(fetch = FetchType.EAGER)
77         @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "db_id") })
78         protected Set<Db> dbs=new HashSet<>();
79
80         @ManyToMany(fetch = FetchType.EAGER)
81         @JoinTable(name = "map_kafka_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "kafka_id") })
82         protected Set<Kafka> kafkas=new HashSet<>();
83
84         /**
85          * indicate if we should monitor this topic
86          */
87         @Column(name = "`enabled`", nullable = false)
88         private boolean enabled;
89
90         /**
91          * save raw message text
92          */
93         @Column(name = "`save_raw`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
94         private boolean saveRaw;
95
96         /**
97          * need to explicitly tell feeder the data format of the message. support JSON,
98          * XML, YAML, TEXT
99          */
100         @Column(name = "`data_format`")
101         protected String dataFormat;
102
103         /**
104          * TTL in day
105          */
106         @Column(name = "`ttl_day`")
107         private Integer ttl;
108
109         //if this flag is true, need to correlate alarm cleared message to previous alarm 
110         @Column(name = "`correlate_cleared_message`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
111         private boolean correlateClearedMessage;
112
113         //paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name"
114         @Column(name = "`message_id_path`")
115         protected String messageIdPath;
116
117         //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray"
118         @Column(name = "`aggregate_array_path`")
119         protected String aggregateArrayPath;
120
121         //paths to the element in array that need flatten, this element is used as label, comma separated, 
122         //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..."
123         @Column(name = "`flatten_array_path`")
124         protected String flattenArrayPath;
125         
126         public String getName() {
127                 return topicName.getId();
128         }
129         
130         public int getTtl() {
131                 if (ttl != null) {
132                         return ttl;
133                 } else {
134                         return 3650;//default to 10 years for safe
135                 }
136         }
137
138         public DataFormat getDataFormat2() {
139                 if (dataFormat != null) {
140                         return DataFormat.fromString(dataFormat);
141                 } else {
142                         return null;
143                 }
144         }
145
146         public String[] getAggregateArrayPath2() {
147                 String[] ret = null;
148
149                 if (StringUtils.isNotBlank(aggregateArrayPath)) {
150                         ret = aggregateArrayPath.split(",");
151                 }
152
153                 return ret;
154         }
155
156         public String[] getFlattenArrayPath2() {
157                 String[] ret = null;
158
159                 if (StringUtils.isNotBlank(flattenArrayPath)) {
160                         ret = flattenArrayPath.split(",");
161                 }
162
163                 return ret;
164         }
165
166         //extract DB id from JSON attributes, support multiple attributes
167         public String getMessageId(JSONObject json) {
168                 String ret = null;
169
170                 if (StringUtils.isNotBlank(messageIdPath)) {
171                         String[] paths = messageIdPath.split(",");
172
173                         StringBuilder sb = new StringBuilder();
174                         for (int i = 0; i < paths.length; i++) {
175                                 if (i > 0) {
176                                         sb.append('^');
177                                 }
178                                 sb.append(json.query(paths[i]).toString());
179                         }
180                         ret = sb.toString();
181                 }
182
183                 return ret;
184         }
185
186         public TopicConfig getTopicConfig() {
187                 TopicConfig tConfig = new TopicConfig();
188
189                 tConfig.setId(getId());
190                 tConfig.setName(getName());
191                 tConfig.setLogin(getLogin());
192                 tConfig.setEnabled(isEnabled());
193                 tConfig.setDataFormat(dataFormat);
194                 tConfig.setSaveRaw(isSaveRaw());
195                 tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
196                 tConfig.setMessageIdPath(getMessageIdPath());
197                 tConfig.setAggregateArrayPath(getAggregateArrayPath());
198                 tConfig.setFlattenArrayPath(getFlattenArrayPath());
199                 tConfig.setTtl(getTtl());
200                 
201                 Set<Db> topicDb = getDbs();
202                 List<String> dbList = new ArrayList<>();
203                 List<String> enabledDbList = new ArrayList<>();
204                 if (topicDb != null) {
205                         for (Db item : topicDb) {
206                                 dbList.add(item.getName());
207                                 if(item.isEnabled()) {
208                                         enabledDbList.add(item.getName());
209                                 }
210                         }
211                 }
212                 tConfig.setSinkdbs(dbList);
213                 tConfig.setEnabledSinkdbs(enabledDbList);
214
215                 Set<Kafka> topicKafka = getKafkas();
216                 List<Integer> kafkaList = new ArrayList<>();
217                 if (topicKafka != null) {
218                         for (Kafka kafka : topicKafka) {
219                                 kafkaList.add(kafka.getId());
220                         }
221                 }
222                 tConfig.setKafkas(kafkaList);
223                 return tConfig;
224         }
225
226         @Override
227         public String toString() {
228                 return String.format("Topic %s (enabled=%s, dbs=%s, kafkas=%s)", topicName, enabled, dbs, kafkas);
229         }
230
231         @Override
232         public boolean equals(Object obj) {
233                 if (obj == null)
234                         return false;
235
236                 if (this.getClass() != obj.getClass())
237                         return false;
238
239                 return id.equals(((Topic) obj).getId());
240         }
241
242         @Override
243         public int hashCode() {
244                 return id;
245         }
246
247 }