HDFS-WriterApp-Fixed all the code review comments
[demo.git] / vnfs / DAaaS / microservices / GoApps / src / go-hdfs-writer / pkg / pipeline / pipeline.go
index c5dbd3c..2e192e9 100644 (file)
@@ -2,52 +2,84 @@ package pipeline
 
 import (
        "fmt"
-       "os"
        "github.com/colinmarc/hdfs"
        "github.com/confluentinc/confluent-kafka-go/kafka"
+       guuid "github.com/google/uuid"
+       "os"
+       "sync"
+
        utils "hdfs-writer/pkg/utils"
-       
 )
 
-// BuildWriterPipeline builds a pipeline
-func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) {
-       slogger := utils.GetLoggerInstance()
+var slogger = utils.GetLoggerInstance()
+
+// ChannelMap is the global map to store writerNames as key and channels as values.
+//var ChannelMap =make(map[string]chan struct{})
+var ChannelMap = make(map[string]chan bool)
+
+// Wg is of type WaitGroup ensures all the writers have enough time to cleanup a
+var Wg sync.WaitGroup
+var writerStr = "writer"
+
+// CreatePipeline initiates the building of a pipeline
+func CreatePipeline(kc utils.KafkaConfig, hc utils.HdfsConfig) string {
+       //pipelineChan := make(chan struct{})
+       pipelineChan := make(chan bool)
+       uuid := guuid.New().String()
+       slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
+       writerName := writerStr + "-" + uuid[len(uuid)-4:]
+       slogger.Infof("::writerName:: %s ", writerName)
+       ChannelMap[writerName] = pipelineChan
+
+       //Every create request shall add 1 to the WaitGroup
+       Wg.Add(1)
+       // envoke the go routine to build pipeline
+       go buildWriterPipeline(kc, hc, writerName, ChannelMap[writerName])
+       return writerName
+}
+
+// buildWriterPipeline builds a pipeline
+func buildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan bool) {
+
        topics := make([]string, 1)
-       topics[0] = k.GetTopic()
-       
-       c,err := kafka.NewConsumer(&kafka.ConfigMap{
-               "bootstrap.servers": k.GetBroker(),
+       topics[0] = k.Topic
+
+       c, err := kafka.NewConsumer(&kafka.ConfigMap{
+               "bootstrap.servers":     k.Broker,
                "broker.address.family": "v4",
-               "group.id":              k.GetGroup(),
+               "group.id":              k.Group,
                "session.timeout.ms":    6000,
                "auto.offset.reset":     "earliest"})
 
        if err != nil {
                fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
-               os.Exit(1)
+               slogger.Info("Failed to create consumer: %s", err.Error())
+               delete(ChannelMap, writerName)
+               Wg.Done()
+               return
        }
        fmt.Printf("Created Consumer %v\n", c)
        err = c.SubscribeTopics(topics, nil)
 
-       run := true
        setUpPipeline := false
 
        var hdfsFileWriter *hdfs.FileWriter
        var hdfsFileWriterError error
        // HDFS CLIENT CREATION
        //client := utils.GetHdfsClientInstance(h.GetHdfsURL())
-       client := utils.CreateHdfsClient(h.GetHdfsURL())
-       
+       client := utils.CreateHdfsClient(h.HdfsURL)
 
-       for run==true {
+       for {
                select {
                case sig := <-sigchan:
+                       defer Wg.Done()
                        client.Close()
-                       if hdfsFileWriter!=nil{
+                       if hdfsFileWriter != nil {
                                cleanup(hdfsFileWriter)
                        }
                        slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
-                       run = false
+                       close(sigchan)
+                       return
                default:
                        //slogger.Info("Running default option ....")
                        ev := c.Poll(100)
@@ -55,51 +87,52 @@ func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName str
                                continue
                        }
                        //:: BEGIN : Switch between different types of messages that come out of kafka
-                       switch e := ev.(type){
+                       switch e := ev.(type) {
                        case *kafka.Message:
                                slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
                                dataStr := string(e.Value)
                                slogger.Infof("byte array ::: %s", []byte(dataStr))
-                               fileInfo, fileInfoError := client.Stat("/" + k.GetTopic())
+                               fileInfo, fileInfoError := client.Stat("/" + k.Topic)
                                // create file if it doesnt exists already
                                if fileInfoError != nil {
-                                       slogger.Infof("Error::: %s",fileInfoError)
-                                       slogger.Infof("Creating file::: %s", "/"+k.GetTopic())
-                                       hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic())
-                                       if hdfsFileWriterError !=nil {
+                                       slogger.Infof("Error::: %s", fileInfoError)
+                                       slogger.Infof("Creating file::: %s", "/"+k.Topic)
+                                       hdfsFileWriterError = client.CreateEmptyFile("/" + k.Topic)
+                                       if hdfsFileWriterError != nil {
                                                slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
-                                                "/"+k.GetTopic(), hdfsFileWriterError.Error())
-                                               panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic()))
+                                                       "/"+k.Topic, hdfsFileWriterError.Error())
+                                               continue
                                        }
-                                       _= client.Chmod("/"+k.GetTopic(), 0777);
+                                       _ = client.Chmod("/"+k.Topic, 0777)
                                }
                                newDataStr := dataStr + "\n"
                                // file exists case, so just append
-                               hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name())
-                               
-                               if hdfsFileWriterError != nil || hdfsFileWriter==nil{
-                                       if(hdfsFileWriter==nil){
+                               hdfsFileWriter, hdfsFileWriterError = client.Append("/" + fileInfo.Name())
+
+                               if hdfsFileWriterError != nil {
+                                       if hdfsFileWriter == nil {
                                                slogger.Infof("hdfsFileWriter is NULL !!")
                                        }
                                        slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
-                                        "/"+k.GetTopic(),hdfsFileWriterError)
-                                       panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic()))
-                               }
-                               bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
-                               if bytesWritten > 0 && error == nil {
-                                       slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
-                                       slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
-                                       
-                                       if setUpPipeline==false{
-                                               slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
-                                               "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL())
-                                               setUpPipeline = true
-                                       }
+                                               "/"+k.Topic, hdfsFileWriterError)
+                                       continue
                                } else {
-                                       slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error)
+                                       bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
+                                       if bytesWritten > 0 && error == nil {
+                                               slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
+                                               slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
+
+                                               if setUpPipeline == false {
+                                                       slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
+                                                               "watching for more messages.. ", k.Topic, h.HdfsURL)
+                                                       setUpPipeline = true
+                                               }
+                                               hdfsFileWriter.Close()
+                                       } else {
+                                               slogger.Info("::: Unable to write to HDFS\n :::Error:: %s", error)
+                                       }
                                }
-                               hdfsFileWriter.Close()
-                       
+
                        case kafka.Error:
                                // Errors should generally be considered
                                // informational, the client will try to
@@ -108,23 +141,33 @@ func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName str
                                // the application if all brokers are down.
                                fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
                                if e.Code() == kafka.ErrAllBrokersDown {
-                                       run = false
+                                       return
                                }
-                       
+
                        default:
                                fmt.Printf("Ignored %v\n", e)
                        } //:: END : Switch between different types of messages that come out of kafka
                } // END: select channel
        } // END : infinite loop
-
-       fmt.Printf("Closing the consumer")
 }
 
-func cleanup(h *hdfs.FileWriter){
-       if h!=nil{
+func cleanup(h *hdfs.FileWriter) {
+       if h != nil {
                err := h.Close()
-               if err!=nil{
-                       fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error())
+               if err != nil {
+                       fmt.Printf(":::Error occured while closing hdfs writer::: \n%s", err.Error())
                }
+
        }
-}
\ No newline at end of file
+       fmt.Printf("\n:::Clean up executed ::: \n")
+}
+
+// DeletePipeline deletes a writer pipeline
+func DeletePipeline(writerName string) {
+       slogger.Infof("::Writer to be closed:: %s", writerName)
+       toBeClosedChannel := ChannelMap[writerName]
+       toBeClosedChannel <- true
+       // deleting the channel from ChannelMap after closure to
+       // avoid closing the closed channel
+       delete(ChannelMap, writerName)
+}