2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.datalake.feeder.service;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
27 import javax.annotation.PostConstruct;
29 import org.apache.commons.collections.CollectionUtils;
30 import org.apache.commons.lang3.StringUtils;
31 import org.apache.commons.lang3.tuple.Pair;
32 import org.json.JSONObject;
34 import org.onap.datalake.feeder.config.ApplicationConfiguration;
35 import org.onap.datalake.feeder.domain.Kafka;
36 import org.onap.datalake.feeder.dto.TopicConfig;
37 import org.onap.datalake.feeder.enumeration.DataFormat;
38 import org.onap.datalake.feeder.util.JsonUtil;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.stereotype.Service;
44 import com.fasterxml.jackson.databind.ObjectMapper;
45 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
48 * Service to store messages to varieties of DBs
50 * comment out YAML support, since AML is for config and don't see this data
51 * type in DMaaP. Do we need to support XML?
57 public class StoreService {
58 private final Logger log = LoggerFactory.getLogger(this.getClass());
61 private ApplicationConfiguration config;
64 private TopicConfigPollingService configPollingService;
67 private MongodbService mongodbService;
70 private CouchbaseService couchbaseService;
73 private ElasticsearchService elasticsearchService;
76 private HdfsService hdfsService;
78 private ObjectMapper yamlReader;
82 yamlReader = new ObjectMapper(new YAMLFactory());
85 public void saveMessages(Kafka kafka, String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
86 if (CollectionUtils.isEmpty(messages)) {
90 TopicConfig topicConfig = configPollingService.getEffectiveTopicConfig(topicStr);
92 List<JSONObject> docs = new ArrayList<>();
94 for (Pair<Long, String> pair : messages) {
96 docs.add(messageToJson(topicConfig, pair));
97 } catch (Exception e) {
98 //may see org.json.JSONException.
99 log.error("Error when converting this message to JSON: " + pair.getRight(), e);
103 saveJsons(topicConfig, docs, messages);
106 private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
108 long timestamp = pair.getLeft();
109 String text = pair.getRight();
111 //for debug, to be remove
112 // String topicStr = topic.getId();
113 // if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
114 // log.debug("{} ={}", topicStr, text);
117 boolean storeRaw = topicConfig.isSaveRaw();
119 JSONObject json = null;
121 DataFormat dataFormat = topicConfig.getDataFormat2();
123 switch (dataFormat) {
125 json = new JSONObject(text);
127 case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON
128 json = XML.toJSONObject(text);
130 case YAML:// Do we need to support YAML?
131 Object obj = yamlReader.readValue(text, Object.class);
132 ObjectMapper jsonWriter = new ObjectMapper();
133 String jsonString = jsonWriter.writeValueAsString(obj);
134 json = new JSONObject(jsonString);
137 json = new JSONObject();
142 //FIXME for debug, to be remove
144 json.remove("_dl_text_");
145 json.remove("_dl_type_");
147 json.put(config.getTimestampLabel(), timestamp);
149 json.put(config.getRawDataLabel(), text);
152 if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
153 String[] paths = topicConfig.getAggregateArrayPath2();
154 for (String path : paths) {
155 JsonUtil.arrayAggregate(path, json);
159 if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
160 String[] paths = topicConfig.getFlattenArrayPath2();
161 for (String path : paths) {
162 JsonUtil.flattenArray(path, json);
169 private void saveJsons(TopicConfig topic, List<JSONObject> jsons, List<Pair<Long, String>> messages) {
170 if (topic.supportMongoDB()) {
171 mongodbService.saveJsons(topic, jsons);
174 if (topic.supportCouchbase()) {
175 couchbaseService.saveJsons(topic, jsons);
178 if (topic.supportElasticsearch()) {
179 elasticsearchService.saveJsons(topic, jsons);
182 if (topic.supportHdfs()) {
183 hdfsService.saveMessages(topic, messages);
187 public void flush() { //force flush all buffer
191 public void flushStall() { //flush stall buffer
192 hdfsService.flushStall();