HDFSWriter microservice working copy
[demo.git] / vnfs / DAaaS / microservices / GoApps / src / go-hdfs-writer / pkg / handler / handler.go
1 package handler
2
3
4 import (
5         "fmt"
6         "net/http"
7         "io/ioutil"
8         "encoding/json"
9         "github.com/gorilla/mux"
10
11         guuid "github.com/google/uuid"
12         pipeline "hdfs-writer/pkg/pipeline"
13         utils "hdfs-writer/pkg/utils"
14 )
15
16
17 var slogger = utils.GetLoggerInstance()
18 // ChannelMap is the global map to store writerNames as key and channels as values.
19 var ChannelMap =make(map[string]chan struct{})
20
21
22 // This is a sample test request handler
23 func testFunc(w http.ResponseWriter, r *http.Request){
24         slogger.Info("Invoking testFunc ...")
25         w.WriteHeader(http.StatusOK)
26         fmt.Fprintln(w,"HTTP Test successful ")
27 }
28
29 // CreateRouter returns a http handler for the registered URLs
30 func CreateRouter() http.Handler{
31         router := mux.NewRouter().StrictSlash(true)
32         slogger.Info("Created router ...")
33         router.HandleFunc("/test", testFunc).Methods("GET")
34         router.HandleFunc("/createWriter", createWriter).Methods("POST")
35         router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE")
36         return router
37 }
38
39
40 // CreateWriter creates a pipeline
41 func createWriter(w http.ResponseWriter, r *http.Request){
42         reqBody, _ := ioutil.ReadAll(r.Body)
43         slogger.Info(string(reqBody))
44         var results map[string]interface{}
45         json.Unmarshal(reqBody, &results)
46         if len(results)==0{
47                 slogger.Fatalf("Unable to read from the config json file, unable to create configObject map")
48         }
49         writerStr := "writer"
50         writer := results[writerStr].(map[string]interface{})
51         kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{})
52         hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{})
53
54         kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj)
55         hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj)
56
57         //populate the channelMap
58         pipelineChan := make(chan struct{})
59         slogger.Infof("Channel created by post :: %v", pipelineChan)
60         uuid := guuid.New().String()
61         //slogger.Infof("guuid :: %s",uuid)
62         slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
63         writerName := writerStr+"-"+uuid[len(uuid)-4:]
64         slogger.Infof("::writerName:: %s ",writerName)
65         ChannelMap[writerName] = pipelineChan
66         
67         // envoke the go routine to build pipeline
68         go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName])
69         successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
70         w.WriteHeader(http.StatusOK)
71         fmt.Fprintln(w,successMessage)
72 }
73
74
75 // deleteWriter deletes a given writer pipeline
76 func deleteWriter(w http.ResponseWriter, r *http.Request){
77         vars := mux.Vars(r)
78         writerName := vars["writerName"]
79         if _, keyExists := ChannelMap[writerName]; keyExists{
80                 slogger.Infof("::Writer to be closed:: %s",writerName)
81                 toBeClosedChannel := ChannelMap[writerName]
82                 close(toBeClosedChannel)
83                 // deleting the channel from ChannelMap after closure to 
84                 // avoid closing the closed channel
85                 delete(ChannelMap, writerName)
86
87                 w.WriteHeader(http.StatusOK)
88                 deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName)
89                 fmt.Fprintln(w,deleteMessage)
90                 
91         }else{
92                 notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName)
93                 fmt.Fprintln(w,notFoundMessage)
94         }
95         
96 }