2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.datalake.feeder.service.db;
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.text.SimpleDateFormat;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.List;
32 import javax.annotation.PostConstruct;
33 import javax.annotation.PreDestroy;
35 import org.apache.commons.lang3.tuple.Pair;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.util.ShutdownHookManager;
41 import org.json.JSONObject;
42 import org.onap.datalake.feeder.config.ApplicationConfiguration;
43 import org.onap.datalake.feeder.domain.Db;
44 import org.onap.datalake.feeder.domain.EffectiveTopic;
45 import org.onap.datalake.feeder.util.Util;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 import org.springframework.beans.factory.annotation.Autowired;
50 import org.springframework.context.annotation.Scope;
51 import org.springframework.stereotype.Service;
57 * Service to write data to HDFS
64 public class HdfsService implements DbStoreService {
66 private final Logger log = LoggerFactory.getLogger(this.getClass());
71 ApplicationConfiguration config;
73 FileSystem fileSystem;
74 private boolean isReady = false;
76 private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new);
77 private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
78 private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS"));
82 private class Buffer {
87 lastFlush = Long.MIN_VALUE;
88 data = new ArrayList<>();
91 public void flush(String topic) {
93 if (!data.isEmpty()) {
94 saveMessages(topic, data);
96 lastFlush = System.currentTimeMillis();
98 } catch (IOException e) {
99 log.error("{} error saving to HDFS. {}", topic, e.getMessage());
103 public void flushStall(String topic) {
104 if (!data.isEmpty() && Util.isStall(lastFlush, config.getHdfsFlushInterval())) {
105 log.debug("going to flushStall topic={}, buffer size={}", topic, data.size());
110 public void addData(List<Pair<Long, String>> messages) {
111 if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
112 lastFlush = System.currentTimeMillis();
115 messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used
118 public void addData2(List<JSONObject> messages) {
119 if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
120 lastFlush = System.currentTimeMillis();
123 messages.stream().forEach(message -> data.add(message.toString()));
126 private void saveMessages(String topic, List<String> bufferList) throws IOException {
128 long thread = Thread.currentThread().getId();
129 Date date = new Date();
130 String day = dayFormat.get().format(date);
131 String time = timeFormat.get().format(date);
133 InetAddress inetAddress = InetAddress.getLocalHost();
134 String hostName = inetAddress.getHostName();
136 String filePath = String.format("/datalake/%s/%s/%s-%s-%s", topic, day, time, hostName, thread);
137 Path path = new Path(filePath);
138 log.debug("writing {} to HDFS {}", bufferList.size(), filePath);
140 // Create a new file and write data to it.
141 FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize());
143 bufferList.stream().forEach(message -> {
145 out.writeUTF(message);
147 } catch (IOException e) {
148 log.error("error writing to HDFS. {}", e.getMessage());
153 log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath);
157 public HdfsService(Db db) {
162 private void init() {
163 // Initialize HDFS Connection
165 //Get configuration of Hadoop system
166 Configuration hdfsConfig = new Configuration();
168 int port = hdfs.getPort() == null ? 8020 : hdfs.getPort();
170 String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port);
171 hdfsConfig.set("fs.defaultFS", hdfsuri);
172 System.setProperty("HADOOP_USER_NAME", hdfs.getLogin());
174 log.info("Connecting to -- {} as {}", hdfsuri, hdfs.getLogin());
176 fileSystem = FileSystem.get(hdfsConfig);
178 //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data
179 ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get();
180 hadoopShutdownHookManager.clearShutdownHooks();
183 } catch (Exception ex) {
184 log.error("error connection to HDFS.", ex);
190 public void cleanUp() {
191 config.getShutdownLock().readLock().lock();
194 log.info("fileSystem.close() at cleanUp.");
197 } catch (IOException e) {
198 log.error("fileSystem.close() at cleanUp.", e);
200 config.getShutdownLock().readLock().unlock();
204 public void flush() {
205 log.info("Force flush ALL data, regardless of stall");
206 bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
209 //if no new data comes in for a topic for a while, need to flush its buffer
210 public void flushStall() {
211 log.debug("Flush stall data");
212 bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
215 //used if raw data should be saved
216 public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) {
217 String topicStr = topic.getName();
219 Map<String, Buffer> bufferMap = bufferLocal.get();
220 final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
222 buffer.addData(messages);
224 if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
225 buffer.flush(topicStr);
227 log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
232 public void saveJsons(EffectiveTopic topic, List<JSONObject> jsons) {
233 String topicStr = topic.getName();
235 Map<String, Buffer> bufferMap = bufferLocal.get();
236 final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
238 buffer.addData2(jsons);
240 if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
241 buffer.flush(topicStr);
243 log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());