Refactor Distributed Analytics project structure
[demo.git] / vnfs / DAaaS / deploy / training-core / hdfs-writer-source-code / hdfs-writer / src / main / java / HdfsWriter.java
1 import org.apache.hadoop.conf.Configuration;
2 import org.apache.hadoop.fs.FSDataOutputStream;
3 import org.apache.hadoop.fs.FileSystem;
4 import org.apache.hadoop.fs.Path;
5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory;
7
8 import java.io.IOException;
9 import java.net.URI;
10
11 public class HdfsWriter {
12
13     private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
14
15
16     public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException {
17         Configuration hdfsConfiguration = new Configuration();
18         FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration);
19         log.info(":::Created hdfsFileSystem:::");
20         return hdfsFileSystem;
21     }
22
23
24     public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException {
25         fsDataOutputStream.writeBytes(bytesFromKafka);
26         log.info(":::Wrote to HDFS:::");
27     }
28
29
30     public FSDataOutputStream  invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException {
31         FSDataOutputStream fsDataOutputStream;
32         if(!hdfsFileSystem.exists(new Path("/"+hdfsFile)))
33             fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile));
34         else
35             fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile));
36         log.info(":::HDFSWriter invoked:::");
37         return fsDataOutputStream;
38     }
39
40 }