HDFSWriter microservice working copy
[demo.git] / vnfs / DAaaS / microservices / GoApps / src / go-hdfs-writer / pkg / pipeline / pipeline.go
1 package pipeline
2
3 import (
4         "fmt"
5         "os"
6         "github.com/colinmarc/hdfs"
7         "github.com/confluentinc/confluent-kafka-go/kafka"
8         utils "hdfs-writer/pkg/utils"
9         
10 )
11
12 // BuildWriterPipeline builds a pipeline
13 func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) {
14         slogger := utils.GetLoggerInstance()
15         topics := make([]string, 1)
16         topics[0] = k.GetTopic()
17         
18         c,err := kafka.NewConsumer(&kafka.ConfigMap{
19                 "bootstrap.servers": k.GetBroker(),
20                 "broker.address.family": "v4",
21                 "group.id":              k.GetGroup(),
22                 "session.timeout.ms":    6000,
23                 "auto.offset.reset":     "earliest"})
24
25         if err != nil {
26                 fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
27                 os.Exit(1)
28         }
29         fmt.Printf("Created Consumer %v\n", c)
30         err = c.SubscribeTopics(topics, nil)
31
32         run := true
33         setUpPipeline := false
34
35         var hdfsFileWriter *hdfs.FileWriter
36         var hdfsFileWriterError error
37         // HDFS CLIENT CREATION
38         //client := utils.GetHdfsClientInstance(h.GetHdfsURL())
39         client := utils.CreateHdfsClient(h.GetHdfsURL())
40         
41
42         for run==true {
43                 select {
44                 case sig := <-sigchan:
45                         client.Close()
46                         if hdfsFileWriter!=nil{
47                                 cleanup(hdfsFileWriter)
48                         }
49                         slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
50                         run = false
51                 default:
52                         //slogger.Info("Running default option ....")
53                         ev := c.Poll(100)
54                         if ev == nil {
55                                 continue
56                         }
57                         //:: BEGIN : Switch between different types of messages that come out of kafka
58                         switch e := ev.(type){
59                         case *kafka.Message:
60                                 slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
61                                 dataStr := string(e.Value)
62                                 slogger.Infof("byte array ::: %s", []byte(dataStr))
63                                 fileInfo, fileInfoError := client.Stat("/" + k.GetTopic())
64                                 // create file if it doesnt exists already
65                                 if fileInfoError != nil {
66                                         slogger.Infof("Error::: %s",fileInfoError)
67                                         slogger.Infof("Creating file::: %s", "/"+k.GetTopic())
68                                         hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic())
69                                         if hdfsFileWriterError !=nil {
70                                                 slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
71                                                  "/"+k.GetTopic(), hdfsFileWriterError.Error())
72                                                 panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic()))
73                                         }
74                                         _= client.Chmod("/"+k.GetTopic(), 0777);
75                                 }
76                                 newDataStr := dataStr + "\n"
77                                 // file exists case, so just append
78                                 hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name())
79                                 
80                                 if hdfsFileWriterError != nil || hdfsFileWriter==nil{
81                                         if(hdfsFileWriter==nil){
82                                                 slogger.Infof("hdfsFileWriter is NULL !!")
83                                         }
84                                         slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
85                                          "/"+k.GetTopic(),hdfsFileWriterError)
86                                         panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic()))
87                                 }
88                                 bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
89                                 if bytesWritten > 0 && error == nil {
90                                         slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
91                                         slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
92                                         
93                                         if setUpPipeline==false{
94                                                 slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
95                                                 "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL())
96                                                 setUpPipeline = true
97                                         }
98                                 } else {
99                                         slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error)
100                                 }
101                                 hdfsFileWriter.Close()
102                         
103                         case kafka.Error:
104                                 // Errors should generally be considered
105                                 // informational, the client will try to
106                                 // automatically recover.
107                                 // But in this example we choose to terminate
108                                 // the application if all brokers are down.
109                                 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
110                                 if e.Code() == kafka.ErrAllBrokersDown {
111                                         run = false
112                                 }
113                         
114                         default:
115                                 fmt.Printf("Ignored %v\n", e)
116                         } //:: END : Switch between different types of messages that come out of kafka
117                 } // END: select channel
118         } // END : infinite loop
119
120         fmt.Printf("Closing the consumer")
121 }
122
123 func cleanup(h *hdfs.FileWriter){
124         if h!=nil{
125                 err := h.Close()
126                 if err!=nil{
127                         fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error())
128                 }
129         }
130 }