9 "github.com/gorilla/mux"
11 guuid "github.com/google/uuid"
12 pipeline "hdfs-writer/pkg/pipeline"
13 utils "hdfs-writer/pkg/utils"
17 var slogger = utils.GetLoggerInstance()
18 // ChannelMap is the global map to store writerNames as key and channels as values.
19 var ChannelMap =make(map[string]chan struct{})
22 // This is a sample test request handler
23 func testFunc(w http.ResponseWriter, r *http.Request){
24 slogger.Info("Invoking testFunc ...")
25 w.WriteHeader(http.StatusOK)
26 fmt.Fprintln(w,"HTTP Test successful ")
29 // CreateRouter returns a http handler for the registered URLs
30 func CreateRouter() http.Handler{
31 router := mux.NewRouter().StrictSlash(true)
32 slogger.Info("Created router ...")
33 router.HandleFunc("/test", testFunc).Methods("GET")
34 router.HandleFunc("/createWriter", createWriter).Methods("POST")
35 router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE")
40 // CreateWriter creates a pipeline
41 func createWriter(w http.ResponseWriter, r *http.Request){
42 reqBody, _ := ioutil.ReadAll(r.Body)
43 slogger.Info(string(reqBody))
44 var results map[string]interface{}
45 json.Unmarshal(reqBody, &results)
47 slogger.Fatalf("Unable to read from the config json file, unable to create configObject map")
50 writer := results[writerStr].(map[string]interface{})
51 kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{})
52 hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{})
54 kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj)
55 hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj)
57 //populate the channelMap
58 pipelineChan := make(chan struct{})
59 slogger.Infof("Channel created by post :: %v", pipelineChan)
60 uuid := guuid.New().String()
61 //slogger.Infof("guuid :: %s",uuid)
62 slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
63 writerName := writerStr+"-"+uuid[len(uuid)-4:]
64 slogger.Infof("::writerName:: %s ",writerName)
65 ChannelMap[writerName] = pipelineChan
67 // envoke the go routine to build pipeline
68 go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName])
69 successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
70 w.WriteHeader(http.StatusOK)
71 fmt.Fprintln(w,successMessage)
75 // deleteWriter deletes a given writer pipeline
76 func deleteWriter(w http.ResponseWriter, r *http.Request){
78 writerName := vars["writerName"]
79 if _, keyExists := ChannelMap[writerName]; keyExists{
80 slogger.Infof("::Writer to be closed:: %s",writerName)
81 toBeClosedChannel := ChannelMap[writerName]
82 close(toBeClosedChannel)
83 // deleting the channel from ChannelMap after closure to
84 // avoid closing the closed channel
85 delete(ChannelMap, writerName)
87 w.WriteHeader(http.StatusOK)
88 deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName)
89 fmt.Fprintln(w,deleteMessage)
92 notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName)
93 fmt.Fprintln(w,notFoundMessage)