HDFSWriter microservice working copy
[demo.git] / vnfs / DAaaS / microservices / GoApps / src / go-hdfs-writer / pkg / pipeline / pipeline.go
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
new file mode 100644 (file)
index 0000000..c5dbd3c
--- /dev/null
@@ -0,0 +1,130 @@
+package pipeline
+
+import (
+       "fmt"
+       "os"
+       "github.com/colinmarc/hdfs"
+       "github.com/confluentinc/confluent-kafka-go/kafka"
+       utils "hdfs-writer/pkg/utils"
+       
+)
+
+// BuildWriterPipeline builds a pipeline
+func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) {
+       slogger := utils.GetLoggerInstance()
+       topics := make([]string, 1)
+       topics[0] = k.GetTopic()
+       
+       c,err := kafka.NewConsumer(&kafka.ConfigMap{
+               "bootstrap.servers": k.GetBroker(),
+               "broker.address.family": "v4",
+               "group.id":              k.GetGroup(),
+               "session.timeout.ms":    6000,
+               "auto.offset.reset":     "earliest"})
+
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
+               os.Exit(1)
+       }
+       fmt.Printf("Created Consumer %v\n", c)
+       err = c.SubscribeTopics(topics, nil)
+
+       run := true
+       setUpPipeline := false
+
+       var hdfsFileWriter *hdfs.FileWriter
+       var hdfsFileWriterError error
+       // HDFS CLIENT CREATION
+       //client := utils.GetHdfsClientInstance(h.GetHdfsURL())
+       client := utils.CreateHdfsClient(h.GetHdfsURL())
+       
+
+       for run==true {
+               select {
+               case sig := <-sigchan:
+                       client.Close()
+                       if hdfsFileWriter!=nil{
+                               cleanup(hdfsFileWriter)
+                       }
+                       slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
+                       run = false
+               default:
+                       //slogger.Info("Running default option ....")
+                       ev := c.Poll(100)
+                       if ev == nil {
+                               continue
+                       }
+                       //:: BEGIN : Switch between different types of messages that come out of kafka
+                       switch e := ev.(type){
+                       case *kafka.Message:
+                               slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
+                               dataStr := string(e.Value)
+                               slogger.Infof("byte array ::: %s", []byte(dataStr))
+                               fileInfo, fileInfoError := client.Stat("/" + k.GetTopic())
+                               // create file if it doesnt exists already
+                               if fileInfoError != nil {
+                                       slogger.Infof("Error::: %s",fileInfoError)
+                                       slogger.Infof("Creating file::: %s", "/"+k.GetTopic())
+                                       hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic())
+                                       if hdfsFileWriterError !=nil {
+                                               slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
+                                                "/"+k.GetTopic(), hdfsFileWriterError.Error())
+                                               panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic()))
+                                       }
+                                       _= client.Chmod("/"+k.GetTopic(), 0777);
+                               }
+                               newDataStr := dataStr + "\n"
+                               // file exists case, so just append
+                               hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name())
+                               
+                               if hdfsFileWriterError != nil || hdfsFileWriter==nil{
+                                       if(hdfsFileWriter==nil){
+                                               slogger.Infof("hdfsFileWriter is NULL !!")
+                                       }
+                                       slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
+                                        "/"+k.GetTopic(),hdfsFileWriterError)
+                                       panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic()))
+                               }
+                               bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
+                               if bytesWritten > 0 && error == nil {
+                                       slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
+                                       slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
+                                       
+                                       if setUpPipeline==false{
+                                               slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
+                                               "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL())
+                                               setUpPipeline = true
+                                       }
+                               } else {
+                                       slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error)
+                               }
+                               hdfsFileWriter.Close()
+                       
+                       case kafka.Error:
+                               // Errors should generally be considered
+                               // informational, the client will try to
+                               // automatically recover.
+                               // But in this example we choose to terminate
+                               // the application if all brokers are down.
+                               fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+                               if e.Code() == kafka.ErrAllBrokersDown {
+                                       run = false
+                               }
+                       
+                       default:
+                               fmt.Printf("Ignored %v\n", e)
+                       } //:: END : Switch between different types of messages that come out of kafka
+               } // END: select channel
+       } // END : infinite loop
+
+       fmt.Printf("Closing the consumer")
+}
+
+func cleanup(h *hdfs.FileWriter){
+       if h!=nil{
+               err := h.Close()
+               if err!=nil{
+                       fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error())
+               }
+       }
+}
\ No newline at end of file