ea0e77aaca4420f1056378817cf26aca08fef9c0
[dcaegen2/services.git] /
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DATALAKE
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21 package org.onap.datalake.feeder.service.db;
22
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.text.SimpleDateFormat;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31
32 import javax.annotation.PostConstruct;
33 import javax.annotation.PreDestroy;
34
35 import org.apache.commons.lang3.tuple.Pair;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.util.ShutdownHookManager;
41 import org.json.JSONObject;
42 import org.onap.datalake.feeder.config.ApplicationConfiguration;
43 import org.onap.datalake.feeder.domain.Db;
44 import org.onap.datalake.feeder.domain.EffectiveTopic;
45 import org.onap.datalake.feeder.util.Util;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import org.springframework.beans.factory.annotation.Autowired;
50 import org.springframework.context.annotation.Scope;
51 import org.springframework.stereotype.Service;
52
53 import lombok.Getter;
54 import lombok.Setter;
55
56 /**
57  * Service to write data to HDFS
58  * 
59  * @author Guobiao Mo
60  *
61  */
62 @Service
63 @Scope("prototype")
64 public class HdfsService implements DbStoreService {
65
66         private final Logger log = LoggerFactory.getLogger(this.getClass());
67         
68         private Db hdfs;
69
70         @Autowired
71         ApplicationConfiguration config;
72
73         FileSystem fileSystem;
74         private boolean isReady = false;
75
76         private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new);
77         private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
78         private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS"));
79
80         @Setter
81         @Getter
82         private class Buffer {
83                 long lastFlush;
84                 List<String> data;
85
86                 public Buffer() {
87                         lastFlush = Long.MIN_VALUE;
88                         data = new ArrayList<>();
89                 }
90
91                 public void flush(String topic) {
92                         try {
93                                 if (!data.isEmpty()) {
94                                         saveMessages(topic, data);
95                                         data.clear();
96                                         lastFlush = System.currentTimeMillis();
97                                 }
98                         } catch (IOException e) {
99                                 log.error("{} error saving to HDFS. {}", topic, e.getMessage());
100                         }
101                 }
102
103                 public void flushStall(String topic) {
104                         if (!data.isEmpty() && Util.isStall(lastFlush, config.getHdfsFlushInterval())) {
105                                 log.debug("going to flushStall topic={}, buffer size={}", topic, data.size());
106                                 flush(topic);
107                         }
108                 }
109
110                 public void addData(List<Pair<Long, String>> messages) {
111                         if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
112                                 lastFlush = System.currentTimeMillis();
113                         }
114
115                         messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used                 
116                 }
117
118                 public void addData2(List<JSONObject> messages) {
119                         if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
120                                 lastFlush = System.currentTimeMillis();
121                         }
122
123                         messages.stream().forEach(message -> data.add(message.toString()));     
124                 }
125
126                 private void saveMessages(String topic, List<String> bufferList) throws IOException {
127
128                         long thread = Thread.currentThread().getId();
129                         Date date = new Date();
130                         String day = dayFormat.get().format(date);
131                         String time = timeFormat.get().format(date);
132
133                         InetAddress inetAddress = InetAddress.getLocalHost();
134                         String hostName = inetAddress.getHostName();
135
136                         String filePath = String.format("/datalake/%s/%s/%s-%s-%s", topic, day, time, hostName, thread);
137                         Path path = new Path(filePath);
138                         log.debug("writing {} to HDFS {}", bufferList.size(), filePath);
139
140                         // Create a new file and write data to it.
141                         FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize());
142
143                         bufferList.stream().forEach(message -> {
144                                 try {
145                                         out.writeUTF(message);
146                                         out.write('\n');
147                                 } catch (IOException e) {
148                                         log.error("error writing to HDFS. {}", e.getMessage());
149                                 }
150                         });
151
152                         out.close();
153                         log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath);
154                 }
155         }
156
157         public HdfsService(Db db) {
158                 hdfs = db;
159         }
160         
161         @PostConstruct
162         private void init() {
163                 // Initialize HDFS Connection 
164                 try {
165                         //Get configuration of Hadoop system
166                         Configuration hdfsConfig = new Configuration();
167
168                         int port = hdfs.getPort() == null ? 8020 : hdfs.getPort();
169
170                         String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port);
171                         hdfsConfig.set("fs.defaultFS", hdfsuri);
172                         System.setProperty("HADOOP_USER_NAME", hdfs.getLogin());
173
174                         log.info("Connecting to -- {} as {}", hdfsuri, hdfs.getLogin());
175
176                         fileSystem = FileSystem.get(hdfsConfig);
177
178                         //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data
179                         ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get();
180                         hadoopShutdownHookManager.clearShutdownHooks();
181
182                         isReady = true;
183                 } catch (Exception ex) {
184                         log.error("error connection to HDFS.", ex);
185                         isReady = false;
186                 }
187         }
188
189         @PreDestroy
190         public void cleanUp() {
191                 config.getShutdownLock().readLock().lock();
192
193                 try {
194                         log.info("fileSystem.close() at cleanUp.");
195                         flush();
196                         fileSystem.close();
197                 } catch (IOException e) {
198                         log.error("fileSystem.close() at cleanUp.", e);
199                 } finally {
200                         config.getShutdownLock().readLock().unlock();
201                 }
202         }
203
204         public void flush() {
205                 log.info("Force flush ALL data, regardless of stall");
206                 bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
207         }
208
209         //if no new data comes in for a topic for a while, need to flush its buffer
210         public void flushStall() {
211                 log.debug("Flush stall data");
212                 bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
213         }
214
215         //used if raw data should be saved
216         public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) {
217                 String topicStr = topic.getName();
218
219                 Map<String, Buffer> bufferMap = bufferLocal.get();
220                 final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
221
222                 buffer.addData(messages);
223
224                 if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
225                         buffer.flush(topicStr);
226                 } else {
227                         log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
228                 }
229         }
230
231         @Override
232         public void saveJsons(EffectiveTopic topic, List<JSONObject> jsons) {
233                 String topicStr = topic.getName();
234
235                 Map<String, Buffer> bufferMap = bufferLocal.get();
236                 final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
237
238                 buffer.addData2(jsons);
239
240                 if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
241                         buffer.flush(topicStr);
242                 } else {
243                         log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
244                 }
245                 
246         }
247         
248 }