5 "github.com/colinmarc/hdfs"
6 "github.com/confluentinc/confluent-kafka-go/kafka"
7 guuid "github.com/google/uuid"
11 utils "hdfs-writer/pkg/utils"
14 var slogger = utils.GetLoggerInstance()
16 // ChannelMap is the global map to store writerNames as key and channels as values.
17 //var ChannelMap =make(map[string]chan struct{})
18 var ChannelMap = make(map[string]chan bool)
20 // Wg is of type WaitGroup ensures all the writers have enough time to cleanup a
22 var writerStr = "writer"
24 // CreatePipeline initiates the building of a pipeline
25 func CreatePipeline(kc utils.KafkaConfig, hc utils.HdfsConfig) string {
26 //pipelineChan := make(chan struct{})
27 pipelineChan := make(chan bool)
28 uuid := guuid.New().String()
29 slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
30 writerName := writerStr + "-" + uuid[len(uuid)-4:]
31 slogger.Infof("::writerName:: %s ", writerName)
32 ChannelMap[writerName] = pipelineChan
34 //Every create request shall add 1 to the WaitGroup
36 // envoke the go routine to build pipeline
37 go buildWriterPipeline(kc, hc, writerName, ChannelMap[writerName])
41 // buildWriterPipeline builds a pipeline
42 func buildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan bool) {
44 topics := make([]string, 1)
47 c, err := kafka.NewConsumer(&kafka.ConfigMap{
48 "bootstrap.servers": k.Broker,
49 "broker.address.family": "v4",
51 "session.timeout.ms": 6000,
52 "auto.offset.reset": "earliest"})
55 fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
56 slogger.Info("Failed to create consumer: %s", err.Error())
57 delete(ChannelMap, writerName)
61 fmt.Printf("Created Consumer %v\n", c)
62 err = c.SubscribeTopics(topics, nil)
64 setUpPipeline := false
66 var hdfsFileWriter *hdfs.FileWriter
67 var hdfsFileWriterError error
68 // HDFS CLIENT CREATION
69 //client := utils.GetHdfsClientInstance(h.GetHdfsURL())
70 client := utils.CreateHdfsClient(h.HdfsURL)
74 case sig := <-sigchan:
77 if hdfsFileWriter != nil {
78 cleanup(hdfsFileWriter)
80 slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
84 //slogger.Info("Running default option ....")
89 //:: BEGIN : Switch between different types of messages that come out of kafka
90 switch e := ev.(type) {
92 slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
93 dataStr := string(e.Value)
94 slogger.Infof("byte array ::: %s", []byte(dataStr))
95 fileInfo, fileInfoError := client.Stat("/" + k.Topic)
96 // create file if it doesnt exists already
97 if fileInfoError != nil {
98 slogger.Infof("Error::: %s", fileInfoError)
99 slogger.Infof("Creating file::: %s", "/"+k.Topic)
100 hdfsFileWriterError = client.CreateEmptyFile("/" + k.Topic)
101 if hdfsFileWriterError != nil {
102 slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
103 "/"+k.Topic, hdfsFileWriterError.Error())
106 _ = client.Chmod("/"+k.Topic, 0777)
108 newDataStr := dataStr + "\n"
109 // file exists case, so just append
110 hdfsFileWriter, hdfsFileWriterError = client.Append("/" + fileInfo.Name())
112 if hdfsFileWriterError != nil {
113 if hdfsFileWriter == nil {
114 slogger.Infof("hdfsFileWriter is NULL !!")
116 slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
117 "/"+k.Topic, hdfsFileWriterError)
120 bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
121 if bytesWritten > 0 && error == nil {
122 slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
123 slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
125 if setUpPipeline == false {
126 slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
127 "watching for more messages.. ", k.Topic, h.HdfsURL)
130 hdfsFileWriter.Close()
132 slogger.Info("::: Unable to write to HDFS\n :::Error:: %s", error)
137 // Errors should generally be considered
138 // informational, the client will try to
139 // automatically recover.
140 // But in this example we choose to terminate
141 // the application if all brokers are down.
142 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
143 if e.Code() == kafka.ErrAllBrokersDown {
148 fmt.Printf("Ignored %v\n", e)
149 } //:: END : Switch between different types of messages that come out of kafka
150 } // END: select channel
151 } // END : infinite loop
154 func cleanup(h *hdfs.FileWriter) {
158 fmt.Printf(":::Error occured while closing hdfs writer::: \n%s", err.Error())
162 fmt.Printf("\n:::Clean up executed ::: \n")
165 // DeletePipeline deletes a writer pipeline
166 func DeletePipeline(writerName string) {
167 slogger.Infof("::Writer to be closed:: %s", writerName)
168 toBeClosedChannel := ChannelMap[writerName]
169 toBeClosedChannel <- true
170 // deleting the channel from ChannelMap after closure to
171 // avoid closing the closed channel
172 delete(ChannelMap, writerName)