HDFS-WriterApp-Fixed all the code review comments
[demo.git] / vnfs / DAaaS / microservices / GoApps / src / go-hdfs-writer / pkg / pipeline / pipeline.go
1 package pipeline
2
3 import (
4         "fmt"
5         "github.com/colinmarc/hdfs"
6         "github.com/confluentinc/confluent-kafka-go/kafka"
7         guuid "github.com/google/uuid"
8         "os"
9         "sync"
10
11         utils "hdfs-writer/pkg/utils"
12 )
13
14 var slogger = utils.GetLoggerInstance()
15
16 // ChannelMap is the global map to store writerNames as key and channels as values.
17 //var ChannelMap =make(map[string]chan struct{})
18 var ChannelMap = make(map[string]chan bool)
19
20 // Wg is of type WaitGroup ensures all the writers have enough time to cleanup a
21 var Wg sync.WaitGroup
22 var writerStr = "writer"
23
24 // CreatePipeline initiates the building of a pipeline
25 func CreatePipeline(kc utils.KafkaConfig, hc utils.HdfsConfig) string {
26         //pipelineChan := make(chan struct{})
27         pipelineChan := make(chan bool)
28         uuid := guuid.New().String()
29         slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
30         writerName := writerStr + "-" + uuid[len(uuid)-4:]
31         slogger.Infof("::writerName:: %s ", writerName)
32         ChannelMap[writerName] = pipelineChan
33
34         //Every create request shall add 1 to the WaitGroup
35         Wg.Add(1)
36         // envoke the go routine to build pipeline
37         go buildWriterPipeline(kc, hc, writerName, ChannelMap[writerName])
38         return writerName
39 }
40
41 // buildWriterPipeline builds a pipeline
42 func buildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan bool) {
43
44         topics := make([]string, 1)
45         topics[0] = k.Topic
46
47         c, err := kafka.NewConsumer(&kafka.ConfigMap{
48                 "bootstrap.servers":     k.Broker,
49                 "broker.address.family": "v4",
50                 "group.id":              k.Group,
51                 "session.timeout.ms":    6000,
52                 "auto.offset.reset":     "earliest"})
53
54         if err != nil {
55                 fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
56                 slogger.Info("Failed to create consumer: %s", err.Error())
57                 delete(ChannelMap, writerName)
58                 Wg.Done()
59                 return
60         }
61         fmt.Printf("Created Consumer %v\n", c)
62         err = c.SubscribeTopics(topics, nil)
63
64         setUpPipeline := false
65
66         var hdfsFileWriter *hdfs.FileWriter
67         var hdfsFileWriterError error
68         // HDFS CLIENT CREATION
69         //client := utils.GetHdfsClientInstance(h.GetHdfsURL())
70         client := utils.CreateHdfsClient(h.HdfsURL)
71
72         for {
73                 select {
74                 case sig := <-sigchan:
75                         defer Wg.Done()
76                         client.Close()
77                         if hdfsFileWriter != nil {
78                                 cleanup(hdfsFileWriter)
79                         }
80                         slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
81                         close(sigchan)
82                         return
83                 default:
84                         //slogger.Info("Running default option ....")
85                         ev := c.Poll(100)
86                         if ev == nil {
87                                 continue
88                         }
89                         //:: BEGIN : Switch between different types of messages that come out of kafka
90                         switch e := ev.(type) {
91                         case *kafka.Message:
92                                 slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
93                                 dataStr := string(e.Value)
94                                 slogger.Infof("byte array ::: %s", []byte(dataStr))
95                                 fileInfo, fileInfoError := client.Stat("/" + k.Topic)
96                                 // create file if it doesnt exists already
97                                 if fileInfoError != nil {
98                                         slogger.Infof("Error::: %s", fileInfoError)
99                                         slogger.Infof("Creating file::: %s", "/"+k.Topic)
100                                         hdfsFileWriterError = client.CreateEmptyFile("/" + k.Topic)
101                                         if hdfsFileWriterError != nil {
102                                                 slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
103                                                         "/"+k.Topic, hdfsFileWriterError.Error())
104                                                 continue
105                                         }
106                                         _ = client.Chmod("/"+k.Topic, 0777)
107                                 }
108                                 newDataStr := dataStr + "\n"
109                                 // file exists case, so just append
110                                 hdfsFileWriter, hdfsFileWriterError = client.Append("/" + fileInfo.Name())
111
112                                 if hdfsFileWriterError != nil {
113                                         if hdfsFileWriter == nil {
114                                                 slogger.Infof("hdfsFileWriter is NULL !!")
115                                         }
116                                         slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
117                                                 "/"+k.Topic, hdfsFileWriterError)
118                                         continue
119                                 } else {
120                                         bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
121                                         if bytesWritten > 0 && error == nil {
122                                                 slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
123                                                 slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
124
125                                                 if setUpPipeline == false {
126                                                         slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
127                                                                 "watching for more messages.. ", k.Topic, h.HdfsURL)
128                                                         setUpPipeline = true
129                                                 }
130                                                 hdfsFileWriter.Close()
131                                         } else {
132                                                 slogger.Info("::: Unable to write to HDFS\n :::Error:: %s", error)
133                                         }
134                                 }
135
136                         case kafka.Error:
137                                 // Errors should generally be considered
138                                 // informational, the client will try to
139                                 // automatically recover.
140                                 // But in this example we choose to terminate
141                                 // the application if all brokers are down.
142                                 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
143                                 if e.Code() == kafka.ErrAllBrokersDown {
144                                         return
145                                 }
146
147                         default:
148                                 fmt.Printf("Ignored %v\n", e)
149                         } //:: END : Switch between different types of messages that come out of kafka
150                 } // END: select channel
151         } // END : infinite loop
152 }
153
154 func cleanup(h *hdfs.FileWriter) {
155         if h != nil {
156                 err := h.Close()
157                 if err != nil {
158                         fmt.Printf(":::Error occured while closing hdfs writer::: \n%s", err.Error())
159                 }
160
161         }
162         fmt.Printf("\n:::Clean up executed ::: \n")
163 }
164
165 // DeletePipeline deletes a writer pipeline
166 func DeletePipeline(writerName string) {
167         slogger.Infof("::Writer to be closed:: %s", writerName)
168         toBeClosedChannel := ChannelMap[writerName]
169         toBeClosedChannel <- true
170         // deleting the channel from ChannelMap after closure to
171         // avoid closing the closed channel
172         delete(ChannelMap, writerName)
173 }