X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=vnfs%2FDAaaS%2Ftraining-core%2Fhdfs-writer-source-code%2Fhdfs-writer%2Fsrc%2Fmain%2Fjava%2FHdfsWriter.java;fp=vnfs%2FDAaaS%2Ftraining-core%2Fhdfs-writer-source-code%2Fhdfs-writer%2Fsrc%2Fmain%2Fjava%2FHdfsWriter.java;h=cd5b6635b950f0618b7072244a4a651d0d5a0fcf;hb=24d0c131ec95b43934dfb76ec397392ff851dbcf;hp=0000000000000000000000000000000000000000;hpb=0696abe27f2f8f2c19931a0d2c5ad0badaf5d236;p=demo.git diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java new file mode 100644 index 00000000..cd5b6635 --- /dev/null +++ b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java @@ -0,0 +1,40 @@ +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +public class HdfsWriter { + + private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class); + + + public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException { + Configuration hdfsConfiguration = new Configuration(); + FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration); + log.info(":::Created hdfsFileSystem:::"); + return hdfsFileSystem; + } + + + public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException { + fsDataOutputStream.writeBytes(bytesFromKafka); + log.info(":::Wrote to HDFS:::"); + } + + + public FSDataOutputStream invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException { + FSDataOutputStream fsDataOutputStream; + if(!hdfsFileSystem.exists(new Path("/"+hdfsFile))) + fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile)); + else + fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile)); + log.info(":::HDFSWriter invoked:::"); + return fsDataOutputStream; + } + +}