291f1cad3ac0eee0b99158cd515d89820fc0ed73
[dcaegen2/services.git] / components / datalake-handler / feeder / src / main / java / org / onap / datalake / feeder / service / StoreService.java
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DATALAKE
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21 package org.onap.datalake.feeder.service;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 import javax.annotation.PostConstruct;
28
29 import org.apache.commons.collections.CollectionUtils;
30 import org.apache.commons.lang3.StringUtils;
31 import org.apache.commons.lang3.tuple.Pair;
32 import org.json.JSONObject;
33 import org.json.XML;
34 import org.onap.datalake.feeder.config.ApplicationConfiguration;
35 import org.onap.datalake.feeder.domain.Kafka;
36 import org.onap.datalake.feeder.dto.TopicConfig;
37 import org.onap.datalake.feeder.enumeration.DataFormat;
38 import org.onap.datalake.feeder.util.JsonUtil;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.stereotype.Service;
43
44 import com.fasterxml.jackson.databind.ObjectMapper;
45 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
46
47 /**
48  * Service to store messages to varieties of DBs
49  * 
50  * comment out YAML support, since AML is for config and don't see this data
51  * type in DMaaP. Do we need to support XML?
52  * 
53  * @author Guobiao Mo
54  *
55  */
56 @Service
57 public class StoreService {
58         private final Logger log = LoggerFactory.getLogger(this.getClass());
59
60         @Autowired
61         private ApplicationConfiguration config;
62
63         @Autowired
64         private TopicConfigPollingService configPollingService;
65
66         @Autowired
67         private MongodbService mongodbService;
68
69         @Autowired
70         private CouchbaseService couchbaseService;
71
72         @Autowired
73         private ElasticsearchService elasticsearchService;
74
75         @Autowired
76         private HdfsService hdfsService;
77
78         private ObjectMapper yamlReader;
79
80         @PostConstruct
81         private void init() {
82                 yamlReader = new ObjectMapper(new YAMLFactory());
83         }
84
85         public void saveMessages(Kafka kafka, String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
86                 if (CollectionUtils.isEmpty(messages)) {
87                         return;
88                 }
89
90                 TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr);
91
92                 List<JSONObject> docs = new ArrayList<>();
93
94                 for (Pair<Long, String> pair : messages) {
95                         try {
96                                 docs.add(messageToJson(topicConfig, pair));
97                         } catch (Exception e) {
98                                 //may see org.json.JSONException.
99                                 log.error("Error when converting this message to JSON: " + pair.getRight(), e);
100                         }
101                 }
102
103                 saveJsons(topicConfig, docs, messages);
104         }
105
106         private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
107
108                 long timestamp = pair.getLeft();
109                 String text = pair.getRight();
110
111                 //for debug, to be remove
112                 //              String topicStr = topic.getId();
113                 //              if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
114                 //              log.debug("{} ={}", topicStr, text);
115                 //}
116
117                 boolean storeRaw = topicConfig.isSaveRaw();
118
119                 JSONObject json = null;
120
121                 DataFormat dataFormat = topicConfig.getDataFormat2();
122
123                 switch (dataFormat) {
124                 case JSON:
125                         json = new JSONObject(text);
126                         break;
127                 case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON 
128                         json = XML.toJSONObject(text);
129                         break;
130                 case YAML:// Do we need to support YAML?
131                         Object obj = yamlReader.readValue(text, Object.class);
132                         ObjectMapper jsonWriter = new ObjectMapper();
133                         String jsonString = jsonWriter.writeValueAsString(obj);
134                         json = new JSONObject(jsonString);
135                         break;
136                 default:
137                         json = new JSONObject();
138                         storeRaw = true;
139                         break;
140                 }
141
142                 //FIXME for debug, to be remove
143                 json.remove("_id");
144                 json.remove("_dl_text_");
145                 json.remove("_dl_type_");
146
147                 json.put(config.getTimestampLabel(), timestamp);
148                 if (storeRaw) {
149                         json.put(config.getRawDataLabel(), text);
150                 }
151
152                 if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
153                         String[] paths = topicConfig.getAggregateArrayPath2();
154                         for (String path : paths) {
155                                 JsonUtil.arrayAggregate(path, json);
156                         }
157                 }
158
159                 if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
160                         String[] paths = topicConfig.getFlattenArrayPath2();
161                         for (String path : paths) {
162                                 JsonUtil.flattenArray(path, json);
163                         }
164                 }
165
166                 return json;
167         }
168
169         private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) {
170                 if (topic.supportMongoDB()) {
171                         mongodbService.saveJsons(topic, jsons);
172                 }
173
174                 if (topic.supportCouchbase()) {
175                         couchbaseService.saveJsons(topic, jsons);
176                 }
177
178                 if (topic.supportElasticsearch()) {
179                         elasticsearchService.saveJsons(topic, jsons);
180                 }
181
182                 if (topic.supportHdfs()) {
183                         hdfsService.saveMessages(topic, messages);
184                 }
185         }
186
187         public void flush() { //force flush all buffer 
188                 hdfsService.flush();
189         }
190
191         public void flushStall() { //flush stall buffer
192                 hdfsService.flushStall();
193         }
194 }