2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.onap.datalake.feeder.domain;
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
27 import javax.persistence.Column;
28 import javax.persistence.Entity;
29 import javax.persistence.FetchType;
30 import javax.persistence.Id;
31 import javax.persistence.JoinColumn;
32 import javax.persistence.JoinTable;
33 import javax.persistence.ManyToMany;
34 import javax.persistence.ManyToOne;
35 import javax.persistence.Table;
37 import org.apache.commons.lang3.StringUtils;
38 import org.json.JSONObject;
39 import org.onap.datalake.feeder.dto.TopicConfig;
40 import org.onap.datalake.feeder.enumeration.DataFormat;
42 import com.fasterxml.jackson.annotation.JsonBackReference;
48 * Domain class representing topic
56 @Table(name = "topic")
59 @Column(name = "`id`")
62 @ManyToOne(fetch = FetchType.EAGER)
63 @JoinColumn(name = "topic_name_id", nullable = false)
64 private TopicName topicName;//topic name
66 //for protected Kafka topics
67 @Column(name = "`login`")
70 @Column(name = "`pass`")
73 //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
75 //@JsonManagedReference
76 @ManyToMany(fetch = FetchType.EAGER)
77 @JoinTable(name = "map_db_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "db_id") })
78 protected Set<Db> dbs=new HashSet<>();
80 @ManyToMany(fetch = FetchType.EAGER)
81 @JoinTable(name = "map_kafka_topic", joinColumns = { @JoinColumn(name = "topic_id") }, inverseJoinColumns = { @JoinColumn(name = "kafka_id") })
82 protected Set<Kafka> kafkas=new HashSet<>();
85 * indicate if we should monitor this topic
87 @Column(name = "`enabled`", nullable = false)
88 private boolean enabled;
91 * save raw message text
93 @Column(name = "`save_raw`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
94 private boolean saveRaw;
97 * need to explicitly tell feeder the data format of the message. support JSON,
100 @Column(name = "`data_format`")
101 protected String dataFormat;
106 @Column(name = "`ttl_day`")
109 //if this flag is true, need to correlate alarm cleared message to previous alarm
110 @Column(name = "`correlate_cleared_message`", nullable = false, columnDefinition = " bit(1) DEFAULT 0")
111 private boolean correlateClearedMessage;
113 //paths to the values in the JSON that are used to composite DB id, comma separated, example: "/event-header/id,/event-header/entity-type,/entity/product-name"
114 @Column(name = "`message_id_path`")
115 protected String messageIdPath;
117 //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray"
118 @Column(name = "`aggregate_array_path`")
119 protected String aggregateArrayPath;
121 //paths to the element in array that need flatten, this element is used as label, comma separated,
122 //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..."
123 @Column(name = "`flatten_array_path`")
124 protected String flattenArrayPath;
126 public String getName() {
127 return topicName.getId();
130 public int getTtl() {
134 return 3650;//default to 10 years for safe
138 public DataFormat getDataFormat2() {
139 if (dataFormat != null) {
140 return DataFormat.fromString(dataFormat);
146 public String[] getAggregateArrayPath2() {
149 if (StringUtils.isNotBlank(aggregateArrayPath)) {
150 ret = aggregateArrayPath.split(",");
156 public String[] getFlattenArrayPath2() {
159 if (StringUtils.isNotBlank(flattenArrayPath)) {
160 ret = flattenArrayPath.split(",");
166 //extract DB id from JSON attributes, support multiple attributes
167 public String getMessageId(JSONObject json) {
170 if (StringUtils.isNotBlank(messageIdPath)) {
171 String[] paths = messageIdPath.split(",");
173 StringBuilder sb = new StringBuilder();
174 for (int i = 0; i < paths.length; i++) {
178 sb.append(json.query(paths[i]).toString());
186 public TopicConfig getTopicConfig() {
187 TopicConfig tConfig = new TopicConfig();
189 tConfig.setId(getId());
190 tConfig.setName(getName());
191 tConfig.setLogin(getLogin());
192 tConfig.setEnabled(isEnabled());
193 tConfig.setDataFormat(dataFormat);
194 tConfig.setSaveRaw(isSaveRaw());
195 tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
196 tConfig.setMessageIdPath(getMessageIdPath());
197 tConfig.setAggregateArrayPath(getAggregateArrayPath());
198 tConfig.setFlattenArrayPath(getFlattenArrayPath());
199 tConfig.setTtl(getTtl());
201 Set<Db> topicDb = getDbs();
202 List<String> dbList = new ArrayList<>();
203 List<String> enabledDbList = new ArrayList<>();
204 if (topicDb != null) {
205 for (Db item : topicDb) {
206 dbList.add(item.getName());
207 if(item.isEnabled()) {
208 enabledDbList.add(item.getName());
212 tConfig.setSinkdbs(dbList);
213 tConfig.setEnabledSinkdbs(enabledDbList);
215 Set<Kafka> topicKafka = getKafkas();
216 List<Integer> kafkaList = new ArrayList<>();
217 if (topicKafka != null) {
218 for (Kafka kafka : topicKafka) {
219 kafkaList.add(kafka.getId());
222 tConfig.setKafkas(kafkaList);
227 public String toString() {
228 return String.format("Topic %s (enabled=%s, dbs=%s, kafkas=%s)", topicName, enabled, dbs, kafkas);
232 public boolean equals(Object obj) {
236 if (this.getClass() != obj.getClass())
239 return id.equals(((Topic) obj).getId());
243 public int hashCode() {