6 "github.com/colinmarc/hdfs"
7 "github.com/confluentinc/confluent-kafka-go/kafka"
8 utils "hdfs-writer/pkg/utils"
12 // BuildWriterPipeline builds a pipeline
13 func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) {
14 slogger := utils.GetLoggerInstance()
15 topics := make([]string, 1)
16 topics[0] = k.GetTopic()
18 c,err := kafka.NewConsumer(&kafka.ConfigMap{
19 "bootstrap.servers": k.GetBroker(),
20 "broker.address.family": "v4",
21 "group.id": k.GetGroup(),
22 "session.timeout.ms": 6000,
23 "auto.offset.reset": "earliest"})
26 fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
29 fmt.Printf("Created Consumer %v\n", c)
30 err = c.SubscribeTopics(topics, nil)
33 setUpPipeline := false
35 var hdfsFileWriter *hdfs.FileWriter
36 var hdfsFileWriterError error
37 // HDFS CLIENT CREATION
38 //client := utils.GetHdfsClientInstance(h.GetHdfsURL())
39 client := utils.CreateHdfsClient(h.GetHdfsURL())
44 case sig := <-sigchan:
46 if hdfsFileWriter!=nil{
47 cleanup(hdfsFileWriter)
49 slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
52 //slogger.Info("Running default option ....")
57 //:: BEGIN : Switch between different types of messages that come out of kafka
58 switch e := ev.(type){
60 slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
61 dataStr := string(e.Value)
62 slogger.Infof("byte array ::: %s", []byte(dataStr))
63 fileInfo, fileInfoError := client.Stat("/" + k.GetTopic())
64 // create file if it doesnt exists already
65 if fileInfoError != nil {
66 slogger.Infof("Error::: %s",fileInfoError)
67 slogger.Infof("Creating file::: %s", "/"+k.GetTopic())
68 hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic())
69 if hdfsFileWriterError !=nil {
70 slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
71 "/"+k.GetTopic(), hdfsFileWriterError.Error())
72 panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic()))
74 _= client.Chmod("/"+k.GetTopic(), 0777);
76 newDataStr := dataStr + "\n"
77 // file exists case, so just append
78 hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name())
80 if hdfsFileWriterError != nil || hdfsFileWriter==nil{
81 if(hdfsFileWriter==nil){
82 slogger.Infof("hdfsFileWriter is NULL !!")
84 slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
85 "/"+k.GetTopic(),hdfsFileWriterError)
86 panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic()))
88 bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
89 if bytesWritten > 0 && error == nil {
90 slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
91 slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
93 if setUpPipeline==false{
94 slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
95 "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL())
99 slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error)
101 hdfsFileWriter.Close()
104 // Errors should generally be considered
105 // informational, the client will try to
106 // automatically recover.
107 // But in this example we choose to terminate
108 // the application if all brokers are down.
109 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
110 if e.Code() == kafka.ErrAllBrokersDown {
115 fmt.Printf("Ignored %v\n", e)
116 } //:: END : Switch between different types of messages that come out of kafka
117 } // END: select channel
118 } // END : infinite loop
120 fmt.Printf("Closing the consumer")
123 func cleanup(h *hdfs.FileWriter){
127 fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error())