Source code initial workingDraft kafka2hdfs writer.
[demo.git] / vnfs / DAaaS / training-core / hdfs-writer-source-code / hdfs-writer / src / main / java / HdfsWriter.java
diff --git a/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java b/vnfs/DAaaS/training-core/hdfs-writer-source-code/hdfs-writer/src/main/java/HdfsWriter.java
new file mode 100644 (file)
index 0000000..cd5b663
--- /dev/null
@@ -0,0 +1,40 @@
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class HdfsWriter {
+
+    private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
+
+
+    public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException {
+        Configuration hdfsConfiguration = new Configuration();
+        FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration);
+        log.info(":::Created hdfsFileSystem:::");
+        return hdfsFileSystem;
+    }
+
+
+    public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException {
+        fsDataOutputStream.writeBytes(bytesFromKafka);
+        log.info(":::Wrote to HDFS:::");
+    }
+
+
+    public FSDataOutputStream  invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException {
+        FSDataOutputStream fsDataOutputStream;
+        if(!hdfsFileSystem.exists(new Path("/"+hdfsFile)))
+            fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile));
+        else
+            fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile));
+        log.info(":::HDFSWriter invoked:::");
+        return fsDataOutputStream;
+    }
+
+}