1 import org.apache.hadoop.conf.Configuration;
2 import org.apache.hadoop.fs.FSDataOutputStream;
3 import org.apache.hadoop.fs.FileSystem;
4 import org.apache.hadoop.fs.Path;
5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory;
8 import java.io.IOException;
11 public class HdfsWriter {
13 private static Logger log = LoggerFactory.getLogger(CreateKafkaConsumer.class);
16 public FileSystem createHdfsFileSystem(String hdfsDestination) throws IOException {
17 Configuration hdfsConfiguration = new Configuration();
18 FileSystem hdfsFileSystem = FileSystem.get(URI.create(hdfsDestination), hdfsConfiguration);
19 log.info(":::Created hdfsFileSystem:::");
20 return hdfsFileSystem;
24 public void writeMessageToHdfs(FSDataOutputStream fsDataOutputStream, String bytesFromKafka) throws IOException {
25 fsDataOutputStream.writeBytes(bytesFromKafka);
26 log.info(":::Wrote to HDFS:::");
30 public FSDataOutputStream invokeHdfsWriter(FileSystem hdfsFileSystem, String hdfsFile) throws IOException {
31 FSDataOutputStream fsDataOutputStream;
32 if(!hdfsFileSystem.exists(new Path("/"+hdfsFile)))
33 fsDataOutputStream = hdfsFileSystem.create(new Path("/"+hdfsFile));
35 fsDataOutputStream = hdfsFileSystem.append(new Path("/"+hdfsFile));
36 log.info(":::HDFSWriter invoked:::");
37 return fsDataOutputStream;