[DCAEGEN2] Release dcaegen2-services-kpi-computation-ms container
[dcaegen2/services.git] / components / datalake-handler / feeder / src / main / java / org / onap / datalake / feeder / domain / Topic.java
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DataLake
4 * ================================================================================
5 * Copyright 2019-2020 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20 package org.onap.datalake.feeder.domain;
21
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.Map;
27 import java.util.HashMap;
28
29 import javax.persistence.Column;
30 import javax.persistence.Entity;
31 import javax.persistence.FetchType;
32 import javax.persistence.Id;
33 import javax.persistence.JoinColumn;
34 import javax.persistence.JoinTable;
35 import javax.persistence.ManyToMany;
36 import javax.persistence.ManyToOne;
37 import javax.persistence.Table;
38 import javax.persistence.GenerationType;
39 import javax.persistence.GeneratedValue;
40
41 import org.apache.commons.lang3.StringUtils;
42 import org.json.JSONObject;
43 import org.onap.datalake.feeder.dto.TopicConfig;
44 import org.onap.datalake.feeder.enumeration.DataFormat;
45
46 import com.fasterxml.jackson.annotation.JsonBackReference;
47
48 import lombok.Getter;
49 import lombok.Setter;
50
51 /**
52  * Domain class representing topic
53  * 
54  * @author Guobiao Mo
55  *
56  */
57 @Setter
58 @Getter
59 @Entity
60 @Table(name = "topic")
61 public class Topic {
62         @Id
63     @Column(name = "`id`")
64         @GeneratedValue(strategy = GenerationType.IDENTITY)
65     private Integer id;
66
67         @ManyToOne(fetch = FetchType.EAGER)
68     @JoinColumn(name = "topic_name_id", nullable = false)
69         private TopicName topicName;//topic name 
70         
71         //for protected Kafka topics
72         @Column(name = "`login`")
73         private String login;
74
75         @Column(name = "`pass`")
76         private String pass;
77
78         //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
79         @JsonBackReference
80         //@JsonManagedReference
81         @ManyToMany(fetch = FetchType.EAGER)
82         @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "db_id") })
83         protected Set<Db> dbs=new HashSet<>();
84
85         @ManyToMany(fetch = FetchType.EAGER)
86         @JoinTable(name = "map_kafka_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "kafka_id") })
87         protected Set<Kafka> kafkas=new HashSet<>();
88
89         /**
90          * indicate if we should monitor this topic
91          */
92         @Column(name = "`enabled`", nullable = false)
93         private boolean enabled;
94
95         /**
96          * save raw message text
97          */
98         @Column(name = "`save_raw`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
99         private boolean saveRaw;
100
101         /**
102          * need to explicitly tell feeder the data format of the message. support JSON,
103          * XML, YAML, TEXT
104          */
105         @Column(name = "`data_format`")
106         protected String dataFormat;
107
108         /**
109          * TTL in day
110          */
111         @Column(name = "`ttl_day`")
112         private Integer ttl;
113
114         //if this flag is true, need to correlate alarm cleared message to previous alarm 
115         @Column(name = "`correlate_cleared_message`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
116         private boolean correlateClearedMessage;
117
118         //paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name"
119         @Column(name = "`message_id_path`")
120         protected String messageIdPath;
121
122         //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray"
123         @Column(name = "`aggregate_array_path`")
124         protected String aggregateArrayPath;
125
126         //paths to the element in array that need flatten, this element is used as label, comma separated, 
127         //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..."
128         @Column(name = "`flatten_array_path`")
129         protected String flattenArrayPath;
130         
131         public String getName() {
132                 return topicName.getId();
133         }
134         
135         public int getTtl() {
136                 if (ttl != null) {
137                         return ttl;
138                 } else {
139                         return 3650;//default to 10 years for safe
140                 }
141         }
142
143         public DataFormat getDataFormat2() {
144                 if (dataFormat != null) {
145                         return DataFormat.fromString(dataFormat);
146                 } else {
147                         return null;
148                 }
149         }
150
151         public String[] getAggregateArrayPath2() {
152                 String[] ret = null;
153
154                 if (StringUtils.isNotBlank(aggregateArrayPath)) {
155                         ret = aggregateArrayPath.split(",");
156                 }
157
158                 return ret;
159         }
160
161         public String[] getFlattenArrayPath2() {
162                 String[] ret = null;
163
164                 if (StringUtils.isNotBlank(flattenArrayPath)) {
165                         ret = flattenArrayPath.split(",");
166                 }
167
168                 return ret;
169         }
170
171         //extract DB id from JSON attributes, support multiple attributes
172         public String getMessageId(JSONObject json) {
173                 String ret = null;
174
175                 if (StringUtils.isNotBlank(messageIdPath)) {
176                         String[] paths = messageIdPath.split(",");
177
178                         StringBuilder sb = new StringBuilder();
179                         for (int i = 0; i < paths.length; i++) {
180                                 if (i > 0) {
181                                         sb.append('^');
182                                 }
183                                 sb.append(json.query(paths[i]).toString());
184                         }
185                         ret = sb.toString();
186                 }
187
188                 return ret;
189         }
190
191         public TopicConfig getTopicConfig() {
192                 TopicConfig tConfig = new TopicConfig();
193
194                 tConfig.setId(getId());
195                 tConfig.setName(getName());
196                 tConfig.setLogin(getLogin());
197                 tConfig.setEnabled(isEnabled());
198                 tConfig.setDataFormat(dataFormat);
199                 tConfig.setSaveRaw(isSaveRaw());
200                 tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
201                 tConfig.setMessageIdPath(getMessageIdPath());
202                 tConfig.setAggregateArrayPath(getAggregateArrayPath());
203                 tConfig.setFlattenArrayPath(getFlattenArrayPath());
204                 tConfig.setTtl(getTtl());
205                 
206                 Set<Db> topicDb = getDbs();
207                 List<Integer> dbList = new ArrayList<>();
208                 List<Integer> enabledDbList = new ArrayList<>();
209                 List<String> enabledDbList2 = new ArrayList<>();
210                 if (topicDb != null) {
211                         for (Db item : topicDb) {
212                                 dbList.add(item.getId());
213                                 if(item.isEnabled()) {
214                                         enabledDbList.add(item.getId());
215                                         enabledDbList2.add(item.getDbType().getId());
216                                 }
217                         }
218                 }
219                 tConfig.setSinkdbs(dbList);
220                 tConfig.setEnabledSinkdbs(enabledDbList);
221                 Map<String,Integer> map = new HashMap<>();
222                 for (String string : enabledDbList2) {
223                         if(map.containsKey(string)) {
224                                 map.put(string, map.get(string).intValue()+1);
225                         }else {
226                                 map.put(string, new Integer(1));
227                         }
228                 }
229                 tConfig.setCountsDb(map);
230
231                 Set<Kafka> topicKafka = getKafkas();
232                 List<Integer> kafkaList = new ArrayList<>();
233                 if (topicKafka != null) {
234                         for (Kafka kafka : topicKafka) {
235                                 kafkaList.add(kafka.getId());
236                         }
237                 }
238                 tConfig.setKafkas(kafkaList);
239                 tConfig.setCountsKafka(kafkaList.size());
240                 return tConfig;
241         }
242
243         @Override
244         public String toString() {
245                 return String.format("Topic %s (enabled=%s, dbs=%s, kafkas=%s)", topicName, enabled, dbs, kafkas);
246         }
247
248         @Override
249         public boolean equals(Object obj) {
250                 if (obj == null)
251                         return false;
252
253                 if (this.getClass() != obj.getClass())
254                         return false;
255
256                 return id.equals(((Topic) obj).getId());
257         }
258
259         @Override
260         public int hashCode() {
261                 return id;
262         }
263
264 }