0e107fdf59caeb18e4871c6a399589083bbb1806
[dcaegen2/services.git] /
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DATALAKE
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21 package org.onap.datalake.feeder.service.db;
22
23 import java.io.IOException;
24 import java.net.InetAddress;
25 import java.text.SimpleDateFormat;
26 import java.util.ArrayList;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31
32 import javax.annotation.PostConstruct;
33 import javax.annotation.PreDestroy;
34
35 import org.apache.commons.lang3.tuple.Pair;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.util.ShutdownHookManager;
41 import org.json.JSONObject;
42 import org.onap.datalake.feeder.config.ApplicationConfiguration;
43 import org.onap.datalake.feeder.domain.Db;
44 import org.onap.datalake.feeder.domain.EffectiveTopic;
45 import org.onap.datalake.feeder.util.Util;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 import org.springframework.beans.factory.annotation.Autowired;
50 import org.springframework.stereotype.Service;
51
52 import lombok.Getter;
53 import lombok.Setter;
54
55 /**
56  * Service to write data to HDFS
57  * 
58  * @author Guobiao Mo
59  *
60  */
61 @Service
62 public class HdfsService implements DbStoreService {
63
64         private final Logger log = LoggerFactory.getLogger(this.getClass());
65         
66         private Db hdfs;
67
68         @Autowired
69         ApplicationConfiguration config;
70
71         FileSystem fileSystem;
72         private boolean isReady = false;
73
74         private ThreadLocal<Map<String, Buffer>> bufferLocal = ThreadLocal.withInitial(HashMap::new);
75         private ThreadLocal<SimpleDateFormat> dayFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
76         private ThreadLocal<SimpleDateFormat> timeFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-SSS"));
77
78         @Setter
79         @Getter
80         private class Buffer {
81                 long lastFlush;
82                 List<String> data;
83
84                 public Buffer() {
85                         lastFlush = Long.MIN_VALUE;
86                         data = new ArrayList<>();
87                 }
88
89                 public void flush(String topic) {
90                         try {
91                                 if (!data.isEmpty()) {
92                                         saveMessages(topic, data);
93                                         data.clear();
94                                         lastFlush = System.currentTimeMillis();
95                                 }
96                         } catch (IOException e) {
97                                 log.error("{} error saving to HDFS. {}", topic, e.getMessage());
98                         }
99                 }
100
101                 public void flushStall(String topic) {
102                         if (!data.isEmpty() && Util.isStall(lastFlush, config.getHdfsFlushInterval())) {
103                                 log.debug("going to flushStall topic={}, buffer size={}", topic, data.size());
104                                 flush(topic);
105                         }
106                 }
107
108                 public void addData(List<Pair<Long, String>> messages) {
109                         if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
110                                 lastFlush = System.currentTimeMillis();
111                         }
112
113                         messages.stream().forEach(message -> data.add(message.getRight()));//note that message left is not used                 
114                 }
115
116                 public void addData2(List<JSONObject> messages) {
117                         if (data.isEmpty()) { //reset the last flush time stamp to current if no existing data in buffer
118                                 lastFlush = System.currentTimeMillis();
119                         }
120
121                         messages.stream().forEach(message -> data.add(message.toString()));     
122                 }
123
124                 private void saveMessages(String topic, List<String> bufferList) throws IOException {
125
126                         long thread = Thread.currentThread().getId();
127                         Date date = new Date();
128                         String day = dayFormat.get().format(date);
129                         String time = timeFormat.get().format(date);
130
131                         InetAddress inetAddress = InetAddress.getLocalHost();
132                         String hostName = inetAddress.getHostName();
133
134                         String filePath = String.format("/datalake/%s/%s/%s-%s-%s", topic, day, time, hostName, thread);
135                         Path path = new Path(filePath);
136                         log.debug("writing {} to HDFS {}", bufferList.size(), filePath);
137
138                         // Create a new file and write data to it.
139                         FSDataOutputStream out = fileSystem.create(path, true, config.getHdfsBufferSize());
140
141                         bufferList.stream().forEach(message -> {
142                                 try {
143                                         out.writeUTF(message);
144                                         out.write('\n');
145                                 } catch (IOException e) {
146                                         log.error("error writing to HDFS. {}", e.getMessage());
147                                 }
148                         });
149
150                         out.close();
151                         log.debug("Done writing {} to HDFS {}", bufferList.size(), filePath);
152                 }
153         }
154
155         public HdfsService( ) { 
156         }
157
158         public HdfsService(Db db) {
159                 hdfs = db;
160         }
161         
162         @PostConstruct
163         private void init() {
164                 // Initialize HDFS Connection 
165                 try {
166                         //Get configuration of Hadoop system
167                         Configuration hdfsConfig = new Configuration();
168
169                         int port = hdfs.getPort() == null ? 8020 : hdfs.getPort();
170
171                         String hdfsuri = String.format("hdfs://%s:%s", hdfs.getHost(), port);
172                         hdfsConfig.set("fs.defaultFS", hdfsuri);
173                         System.setProperty("HADOOP_USER_NAME", hdfs.getLogin());
174
175                         log.info("Connecting to -- {} as {}", hdfsuri, hdfs.getLogin());
176
177                         fileSystem = FileSystem.get(hdfsConfig);
178
179                         //disable Hadoop Shutdown Hook, we need the HDFS connection to flush data
180                         ShutdownHookManager hadoopShutdownHookManager = ShutdownHookManager.get();
181                         hadoopShutdownHookManager.clearShutdownHooks();
182
183                         isReady = true;
184                 } catch (Exception ex) {
185                         log.error("error connection to HDFS.", ex);
186                         isReady = false;
187                 }
188         }
189
190         @PreDestroy
191         public void cleanUp() {
192                 config.getShutdownLock().readLock().lock();
193
194                 try {
195                         log.info("fileSystem.close() at cleanUp.");
196                         flush();
197                         fileSystem.close();
198                 } catch (IOException e) {
199                         log.error("fileSystem.close() at cleanUp.", e);
200                 } finally {
201                         config.getShutdownLock().readLock().unlock();
202                 }
203         }
204
205         public void flush() {
206                 log.info("Force flush ALL data, regardless of stall");
207                 bufferLocal.get().forEach((topic, buffer) -> buffer.flush(topic));
208         }
209
210         //if no new data comes in for a topic for a while, need to flush its buffer
211         public void flushStall() {
212                 log.debug("Flush stall data");
213                 bufferLocal.get().forEach((topic, buffer) -> buffer.flushStall(topic));
214         }
215
216         //used if raw data should be saved
217         public void saveMessages(EffectiveTopic topic, List<Pair<Long, String>> messages) {
218                 String topicStr = topic.getName();
219
220                 Map<String, Buffer> bufferMap = bufferLocal.get();
221                 final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
222
223                 buffer.addData(messages);
224
225                 if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
226                         buffer.flush(topicStr);
227                 } else {
228                         log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
229                 }
230         }
231
232         @Override
233         public void saveJsons(EffectiveTopic topic, List<JSONObject> jsons) {
234                 String topicStr = topic.getName();
235
236                 Map<String, Buffer> bufferMap = bufferLocal.get();
237                 final Buffer buffer = bufferMap.computeIfAbsent(topicStr, k -> new Buffer());
238
239                 buffer.addData2(jsons);
240
241                 if (!config.isAsync() || buffer.getData().size() >= config.getHdfsBatchSize()) {
242                         buffer.flush(topicStr);
243                 } else {
244                         log.debug("buffer size too small to flush {}: bufferData.size() {} < config.getHdfsBatchSize() {}", topicStr, buffer.getData().size(), config.getHdfsBatchSize());
245                 }
246                 
247         }
248         
249 }