2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.datalake.feeder.service.db;
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.text.SimpleDateFormat;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.List;
32 import javax.annotation.PostConstruct;
33 import javax.annotation.PreDestroy;
35 import org.apache.commons.lang3.tuple.Pair;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.util.ShutdownHookManager;
41 import org.json.JSONObject;
42 import org.onap.datalake.feeder.config.ApplicationConfiguration;
43 import org.onap.datalake.feeder.domain.Db;
44 import org.onap.datalake.feeder.domain.EffectiveTopic;
45 import org.onap.datalake.feeder.util.Util;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
49 import org.springframework.beans.factory.annotation.Autowired;
50 import org.springframework.stereotype.Service;
56 * Service to write data to HDFS
62 public class HdfsService implements DbStoreService {
64 private final Logger log = LoggerFactory.getLogger(this.getClass());
69 ApplicationConfiguration config;
71 FileSystem fileSystem;
72 private boolean isReady = false;
74 private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new);
75 private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
76 private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS"));
80 private class Buffer {
85 lastFlush = Long.MIN_VALUE;
86 data = new ArrayList<>();
89 public void flush(String topic) {
91 if (!data.isEmpty()) {
92 saveMessages(topic, data);
94 lastFlush = System.currentTimeMillis();
96 } catch (IOException e) {
97 log.error("{} error saving to HDFS. {}", topic, e.getMessage());
101 public void flushStall(String topic) {
102 if (!data.isEmpty() && Util.isStall(lastFlush, config.getHdfsFlushInterval())) {
103 log.debug("going to flushStall topic={}, buffer size={}", topic, data.size());
108 public void addData(List<Pair<Long, String>> messages) {
109 if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
110 lastFlush = System.currentTimeMillis();
113 messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used
116 public void addData2(List<JSONObject> messages) {
117 if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
118 lastFlush = System.currentTimeMillis();
121 messages.stream().forEach(message -> data.add(message.toString()));
124 private void saveMessages(String topic, List<String> bufferList) throws IOException {
126 long thread = Thread.currentThread().getId();
127 Date date = new Date();
128 String day = dayFormat.get().format(date);
129 String time = timeFormat.get().format(date);
131 InetAddress inetAddress = InetAddress.getLocalHost();
132 String hostName = inetAddress.getHostName();
134 String filePath = String.format("/datalake/%s/%s/%s-%s-%s", topic, day, time, hostName, thread);
135 Path path = new Path(filePath);
136 log.debug("writing {} to HDFS {}", bufferList.size(), filePath);
138 // Create a new file and write data to it.
139 FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize());
141 bufferList.stream().forEach(message -> {
143 out.writeUTF(message);
145 } catch (IOException e) {
146 log.error("error writing to HDFS. {}", e.getMessage());
151 log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath);
155 public HdfsService( ) {
158 public HdfsService(Db db) {
163 private void init() {
164 // Initialize HDFS Connection
166 //Get configuration of Hadoop system
167 Configuration hdfsConfig = new Configuration();
169 int port = hdfs.getPort() == null ? 8020 : hdfs.getPort();
171 String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port);
172 hdfsConfig.set("fs.defaultFS", hdfsuri);
173 System.setProperty("HADOOP_USER_NAME", hdfs.getLogin());
175 log.info("Connecting to -- {} as {}", hdfsuri, hdfs.getLogin());
177 fileSystem = FileSystem.get(hdfsConfig);
179 //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data
180 ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get();
181 hadoopShutdownHookManager.clearShutdownHooks();
184 } catch (Exception ex) {
185 log.error("error connection to HDFS.", ex);
191 public void cleanUp() {
192 config.getShutdownLock().readLock().lock();
195 log.info("fileSystem.close() at cleanUp.");
198 } catch (IOException e) {
199 log.error("fileSystem.close() at cleanUp.", e);
201 config.getShutdownLock().readLock().unlock();
205 public void flush() {
206 log.info("Force flush ALL data, regardless of stall");
207 bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
210 //if no new data comes in for a topic for a while, need to flush its buffer
211 public void flushStall() {
212 log.debug("Flush stall data");
213 bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
216 //used if raw data should be saved
217 public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) {
218 String topicStr = topic.getName();
220 Map<String, Buffer> bufferMap = bufferLocal.get();
221 final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
223 buffer.addData(messages);
225 if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
226 buffer.flush(topicStr);
228 log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
233 public void saveJsons(EffectiveTopic topic, List<JSONObject> jsons) {
234 String topicStr = topic.getName();
236 Map<String, Buffer> bufferMap = bufferLocal.get();
237 final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
239 buffer.addData2(jsons);
241 if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
242 buffer.flush(topicStr);
244 log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());