Merge "Fix yamllint issues"
[dcaegen2/services.git] / components / datalake-handler / feeder / src / main / java / org / onap / datalake / feeder / service / StoreService.java
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DATALAKE
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21 package org.onap.datalake.feeder.service;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26
27 import javax.annotation.PostConstruct;
28
29 import org.apache.commons.collections.CollectionUtils;
30 import org.apache.commons.lang3.StringUtils;
31 import org.apache.commons.lang3.tuple.Pair;
32 import org.json.JSONObject;
33 import org.json.XML;
34 import org.onap.datalake.feeder.config.ApplicationConfiguration;
35 import org.onap.datalake.feeder.dto.TopicConfig;
36 import org.onap.datalake.feeder.enumeration.DataFormat;
37 import org.onap.datalake.feeder.util.JsonUtil;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.stereotype.Service;
42
43 import com.fasterxml.jackson.databind.ObjectMapper;
44 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
45
46 /**
47  * Service to store messages to varieties of DBs
48  * 
49  * comment out YAML support, since AML is for config and don't see this data
50  * type in DMaaP. Do we need to support XML?
51  * 
52  * @author Guobiao Mo
53  *
54  */
55 @Service
56 public class StoreService {
57         private final Logger log = LoggerFactory.getLogger(this.getClass());
58
59         @Autowired
60         private ApplicationConfiguration config;
61
62         @Autowired
63         private TopicConfigPollingService configPollingService;
64
65         @Autowired
66         private MongodbService mongodbService;
67
68         @Autowired
69         private CouchbaseService couchbaseService;
70
71         @Autowired
72         private ElasticsearchService elasticsearchService;
73
74         @Autowired
75         private HdfsService hdfsService;
76
77         private ObjectMapper yamlReader;
78
79         @PostConstruct
80         private void init() {
81                 yamlReader = new ObjectMapper(new YAMLFactory());
82         }
83
84         public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
85                 if (CollectionUtils.isEmpty(messages)) {
86                         return;
87                 }
88
89                 TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr);
90
91                 List<JSONObject> docs = new ArrayList<>();
92
93                 for (Pair<Long, String> pair : messages) {
94                         try {
95                                 docs.add(messageToJson(topicConfig, pair));
96                         } catch (Exception e) {
97                                 //may see org.json.JSONException.
98                                 log.error("Error when converting this message to JSON: " + pair.getRight(), e);
99                         }
100                 }
101
102                 saveJsons(topicConfig, docs, messages);
103         }
104
105         private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
106
107                 long timestamp = pair.getLeft();
108                 String text = pair.getRight();
109
110                 //for debug, to be remove
111                 //              String topicStr = topic.getId();
112                 //              if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
113                 //              log.debug("{} ={}", topicStr, text);
114                 //}
115
116                 boolean storeRaw = topicConfig.isSaveRaw();
117
118                 JSONObject json = null;
119
120                 DataFormat dataFormat = topicConfig.getDataFormat2();
121
122                 switch (dataFormat) {
123                 case JSON:
124                         json = new JSONObject(text);
125                         break;
126                 case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON 
127                         json = XML.toJSONObject(text);
128                         break;
129                 case YAML:// Do we need to support YAML?
130                         Object obj = yamlReader.readValue(text, Object.class);
131                         ObjectMapper jsonWriter = new ObjectMapper();
132                         String jsonString = jsonWriter.writeValueAsString(obj);
133                         json = new JSONObject(jsonString);
134                         break;
135                 default:
136                         json = new JSONObject();
137                         storeRaw = true;
138                         break;
139                 }
140
141                 //FIXME for debug, to be remove
142                 json.remove("_id");
143                 json.remove("_dl_text_");
144                 json.remove("_dl_type_");
145
146                 json.put(config.getTimestampLabel(), timestamp);
147                 if (storeRaw) {
148                         json.put(config.getRawDataLabel(), text);
149                 }
150
151                 if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
152                         String[] paths = topicConfig.getAggregateArrayPath2();
153                         for (String path : paths) {
154                                 JsonUtil.arrayAggregate(path, json);
155                         }
156                 }
157
158                 if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
159                         String[] paths = topicConfig.getFlattenArrayPath2();
160                         for (String path : paths) {
161                                 JsonUtil.flattenArray(path, json);
162                         }
163                 }
164
165                 return json;
166         }
167
168         private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) {
169                 if (topic.supportMongoDB()) {
170                         mongodbService.saveJsons(topic, jsons);
171                 }
172
173                 if (topic.supportCouchbase()) {
174                         couchbaseService.saveJsons(topic, jsons);
175                 }
176
177                 if (topic.supportElasticsearch()) {
178                         elasticsearchService.saveJsons(topic, jsons);
179                 }
180
181                 if (topic.supportHdfs()) {
182                         hdfsService.saveMessages(topic, messages);
183                 }
184         }
185
186         public void flush() { //force flush all buffer 
187                 hdfsService.flush();
188         }
189
190         public void flushStall() { //flush stall buffer
191                 hdfsService.flushStall();
192         }
193 }