2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019-2020 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.datalake.feeder.domain;
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
27 import java.util.HashMap;
29 import javax.persistence.Column;
30 import javax.persistence.Entity;
31 import javax.persistence.FetchType;
32 import javax.persistence.Id;
33 import javax.persistence.JoinColumn;
34 import javax.persistence.JoinTable;
35 import javax.persistence.ManyToMany;
36 import javax.persistence.ManyToOne;
37 import javax.persistence.Table;
38 import javax.persistence.GenerationType;
39 import javax.persistence.GeneratedValue;
41 import org.apache.commons.lang3.StringUtils;
42 import org.json.JSONObject;
43 import org.onap.datalake.feeder.dto.TopicConfig;
44 import org.onap.datalake.feeder.enumeration.DataFormat;
46 import com.fasterxml.jackson.annotation.JsonBackReference;
52 * Domain class representing topic
60 @Table(name = "topic")
63 @Column(name = "`id`")
64 @GeneratedValue(strategy = GenerationType.IDENTITY)
67 @ManyToOne(fetch = FetchType.EAGER)
68 @JoinColumn(name = "topic_name_id", nullable = false)
69 private TopicName topicName;//topic name
71 //for protected Kafka topics
72 @Column(name = "`login`")
75 @Column(name = "`pass`")
78 //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
80 //@JsonManagedReference
81 @ManyToMany(fetch = FetchType.EAGER)
82 @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "db_id") })
83 protected Set<Db> dbs=new HashSet<>();
85 @ManyToMany(fetch = FetchType.EAGER)
86 @JoinTable(name = "map_kafka_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "kafka_id") })
87 protected Set<Kafka> kafkas=new HashSet<>();
90 * indicate if we should monitor this topic
92 @Column(name = "`enabled`", nullable = false)
93 private boolean enabled;
96 * save raw message text
98 @Column(name = "`save_raw`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
99 private boolean saveRaw;
102 * need to explicitly tell feeder the data format of the message. support JSON,
105 @Column(name = "`data_format`")
106 protected String dataFormat;
111 @Column(name = "`ttl_day`")
114 //if this flag is true, need to correlate alarm cleared message to previous alarm
115 @Column(name = "`correlate_cleared_message`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
116 private boolean correlateClearedMessage;
118 //paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name"
119 @Column(name = "`message_id_path`")
120 protected String messageIdPath;
122 //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray"
123 @Column(name = "`aggregate_array_path`")
124 protected String aggregateArrayPath;
126 //paths to the element in array that need flatten, this element is used as label, comma separated,
127 //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..."
128 @Column(name = "`flatten_array_path`")
129 protected String flattenArrayPath;
131 public String getName() {
132 return topicName.getId();
135 public int getTtl() {
139 return 3650;//default to 10 years for safe
143 public DataFormat getDataFormat2() {
144 if (dataFormat != null) {
145 return DataFormat.fromString(dataFormat);
151 public String[] getAggregateArrayPath2() {
154 if (StringUtils.isNotBlank(aggregateArrayPath)) {
155 ret = aggregateArrayPath.split(",");
161 public String[] getFlattenArrayPath2() {
164 if (StringUtils.isNotBlank(flattenArrayPath)) {
165 ret = flattenArrayPath.split(",");
171 //extract DB id from JSON attributes, support multiple attributes
172 public String getMessageId(JSONObject json) {
175 if (StringUtils.isNotBlank(messageIdPath)) {
176 String[] paths = messageIdPath.split(",");
178 StringBuilder sb = new StringBuilder();
179 for (int i = 0; i < paths.length; i++) {
183 sb.append(json.query(paths[i]).toString());
191 public TopicConfig getTopicConfig() {
192 TopicConfig tConfig = new TopicConfig();
194 tConfig.setId(getId());
195 tConfig.setName(getName());
196 tConfig.setLogin(getLogin());
197 tConfig.setEnabled(isEnabled());
198 tConfig.setDataFormat(dataFormat);
199 tConfig.setSaveRaw(isSaveRaw());
200 tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
201 tConfig.setMessageIdPath(getMessageIdPath());
202 tConfig.setAggregateArrayPath(getAggregateArrayPath());
203 tConfig.setFlattenArrayPath(getFlattenArrayPath());
204 tConfig.setTtl(getTtl());
206 Set<Db> topicDb = getDbs();
207 List<Integer> dbList = new ArrayList<>();
208 List<Integer> enabledDbList = new ArrayList<>();
209 List<String> enabledDbList2 = new ArrayList<>();
210 if (topicDb != null) {
211 for (Db item : topicDb) {
212 dbList.add(item.getId());
213 if(item.isEnabled()) {
214 enabledDbList.add(item.getId());
215 enabledDbList2.add(item.getDbType().getId());
219 tConfig.setSinkdbs(dbList);
220 tConfig.setEnabledSinkdbs(enabledDbList);
221 Map<String,Integer> map = new HashMap<>();
222 for (String string : enabledDbList2) {
223 if(map.containsKey(string)) {
224 map.put(string, map.get(string).intValue()+1);
226 map.put(string, new Integer(1));
229 tConfig.setCountsDb(map);
231 Set<Kafka> topicKafka = getKafkas();
232 List<Integer> kafkaList = new ArrayList<>();
233 if (topicKafka != null) {
234 for (Kafka kafka : topicKafka) {
235 kafkaList.add(kafka.getId());
238 tConfig.setKafkas(kafkaList);
239 tConfig.setCountsKafka(kafkaList.size());
244 public String toString() {
245 return String.format("Topic %s (enabled=%s, dbs=%s, kafkas=%s)", topicName, enabled, dbs, kafkas);
249 public boolean equals(Object obj) {
253 if (this.getClass() != obj.getClass())
256 return id.equals(((Topic) obj).getId());
260 public int hashCode() {