2 * ============LICENSE_START=======================================================
 
   4 * ================================================================================
 
   5 * Copyright 2019 China Mobile
 
   6 *=================================================================================
 
   7 * Licensed under the Apache License, Version 2.0 (the "License");
 
   8 * you may not use this file except in compliance with the License.
 
   9 * You may obtain a copy of the License at
 
  11 *     http://www.apache.org/licenses/LICENSE-2.0
 
  13 * Unless required by applicable law or agreed to in writing, software
 
  14 * distributed under the License is distributed on an "AS IS" BASIS,
 
  15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16 * See the License for the specific language governing permissions and
 
  17 * limitations under the License.
 
  18 * ============LICENSE_END=========================================================
 
  21 package org.onap.datalake.feeder.service.db;
 
  23 import java.io.IOException;
 
  24 import java.net.InetAddress;
 
  25 import java.text.SimpleDateFormat;
 
  26 import java.util.ArrayList;
 
  27 import java.util.Date;
 
  28 import java.util.HashMap;
 
  29 import java.util.List;
 
  32 import javax.annotation.PostConstruct;
 
  33 import javax.annotation.PreDestroy;
 
  35 import org.apache.commons.lang3.tuple.Pair;
 
  36 import org.apache.hadoop.conf.Configuration;
 
  37 import org.apache.hadoop.fs.FSDataOutputStream;
 
  38 import org.apache.hadoop.fs.FileSystem;
 
  39 import org.apache.hadoop.fs.Path;
 
  40 import org.apache.hadoop.util.ShutdownHookManager;
 
  41 import org.json.JSONObject;
 
  42 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 
  43 import org.onap.datalake.feeder.domain.Db;
 
  44 import org.onap.datalake.feeder.domain.EffectiveTopic;
 
  45 import org.onap.datalake.feeder.util.Util;
 
  46 import org.slf4j.Logger;
 
  47 import org.slf4j.LoggerFactory;
 
  49 import org.springframework.beans.factory.annotation.Autowired;
 
  50 import org.springframework.stereotype.Service;
 
  56  * Service to write data to HDFS
 
  62 public class HdfsService implements DbStoreService {
 
  64         private final Logger log = LoggerFactory.getLogger(this.getClass());
 
  69         ApplicationConfiguration config;
 
  71         FileSystem fileSystem;
 
  72         private boolean isReady = false;
 
  74         private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new);
 
  75         private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
 
  76         private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS"));
 
  80         private class Buffer {
 
  85                         lastFlush = Long.MIN_VALUE;
 
  86                         data = new ArrayList<>();
 
  89                 public void flush(String topic) {
 
  91                                 if (!data.isEmpty()) {
 
  92                                         saveMessages(topic, data);
 
  94                                         lastFlush = System.currentTimeMillis();
 
  96                         } catch (IOException e) {
 
  97                                 log.error("{} error saving to HDFS. {}", topic, e.getMessage());
 
 101                 public void flushStall(String topic) {
 
 102                         if (!data.isEmpty() && Util.isStall(lastFlush, config.getHdfsFlushInterval())) {
 
 103                                 log.debug("going to flushStall topic={}, buffer size={}", topic, data.size());
 
 108                 public void addData(List<Pair<Long, String>> messages) {
 
 109                         if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
 
 110                                 lastFlush = System.currentTimeMillis();
 
 113                         messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used                 
 
 116                 public void addData2(List<JSONObject> messages) {
 
 117                         if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
 
 118                                 lastFlush = System.currentTimeMillis();
 
 121                         messages.stream().forEach(message -> data.add(message.toString()));     
 
 124                 private void saveMessages(String topic, List<String> bufferList) throws IOException {
 
 126                         long thread = Thread.currentThread().getId();
 
 127                         Date date = new Date();
 
 128                         String day = dayFormat.get().format(date);
 
 129                         String time = timeFormat.get().format(date);
 
 131                         InetAddress inetAddress = InetAddress.getLocalHost();
 
 132                         String hostName = inetAddress.getHostName();
 
 134                         String filePath = String.format("/datalake/%s/%s/%s-%s-%s", topic, day, time, hostName, thread);
 
 135                         Path path = new Path(filePath);
 
 136                         log.debug("writing {} to HDFS {}", bufferList.size(), filePath);
 
 138                         // Create a new file and write data to it.
 
 139                         FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize());
 
 141                         bufferList.stream().forEach(message -> {
 
 143                                         out.writeUTF(message);
 
 145                                 } catch (IOException e) {
 
 146                                         log.error("error writing to HDFS. {}", e.getMessage());
 
 151                         log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath);
 
 155         public HdfsService( ) { 
 
 158         public HdfsService(Db db) {
 
 163         private void init() {
 
 164                 // Initialize HDFS Connection 
 
 166                         //Get configuration of Hadoop system
 
 167                         Configuration hdfsConfig = new Configuration();
 
 169                         int port = hdfs.getPort() == null ? 8020 : hdfs.getPort();
 
 171                         String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port);
 
 172                         hdfsConfig.set("fs.defaultFS", hdfsuri);
 
 173                         System.setProperty("HADOOP_USER_NAME", hdfs.getLogin());
 
 175                         log.info("Connecting to -- {} as {}", hdfsuri, hdfs.getLogin());
 
 177                         fileSystem = FileSystem.get(hdfsConfig);
 
 179                         //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data
 
 180                         ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get();
 
 181                         hadoopShutdownHookManager.clearShutdownHooks();
 
 184                 } catch (Exception ex) {
 
 185                         log.error("error connection to HDFS.", ex);
 
 191         public void cleanUp() {
 
 192                 config.getShutdownLock().readLock().lock();
 
 195                         log.info("fileSystem.close() at cleanUp.");
 
 198                 } catch (IOException e) {
 
 199                         log.error("fileSystem.close() at cleanUp.", e);
 
 201                         config.getShutdownLock().readLock().unlock();
 
 205         public void flush() {
 
 206                 log.info("Force flush ALL data, regardless of stall");
 
 207                 bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
 
 210         //if no new data comes in for a topic for a while, need to flush its buffer
 
 211         public void flushStall() {
 
 212                 log.debug("Flush stall data");
 
 213                 bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
 
 216         //used if raw data should be saved
 
 217         public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) {
 
 218                 String topicStr = topic.getName();
 
 220                 Map<String, Buffer> bufferMap = bufferLocal.get();
 
 221                 final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
 
 223                 buffer.addData(messages);
 
 225                 if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
 
 226                         buffer.flush(topicStr);
 
 228                         log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
 
 233         public void saveJsons(EffectiveTopic topic, List<JSONObject> jsons) {
 
 234                 String topicStr = topic.getName();
 
 236                 Map<String, Buffer> bufferMap = bufferLocal.get();
 
 237                 final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
 
 239                 buffer.addData2(jsons);
 
 241                 if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
 
 242                         buffer.flush(topicStr);
 
 244                         log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());