HDFS-WriterApp-Fixed all the code review comments
[demo.git] / vnfs / DAaaS / microservices / GoApps / src / go-hdfs-writer / pkg / handler / handler.go
1 package handler
2
3 import (
4         "encoding/json"
5         "fmt"
6         "github.com/gorilla/mux"
7         "io/ioutil"
8         "net/http"
9         "strings"
10
11         pipeline "hdfs-writer/pkg/pipeline"
12         utils "hdfs-writer/pkg/utils"
13 )
14
15 var slogger = utils.GetLoggerInstance()
16
17 // CreateRouter returns a http handler for the registered URLs
18 func CreateRouter() http.Handler {
19         router := mux.NewRouter().StrictSlash(true)
20         slogger.Info("Created router ...")
21         router.HandleFunc("/v1/writer", createWriter).Methods("POST")
22         router.HandleFunc("/v1/writer/{writerName}", deleteWriter).Methods("DELETE")
23         router.HandleFunc("/v1/writers", getAllWriters).Methods("GET")
24         return router
25 }
26
27 // CreateWriter creates a pipeline
28 func createWriter(w http.ResponseWriter, r *http.Request) {
29         if r.Body == nil {
30                 http.Error(w, "Empty body", http.StatusBadRequest)
31                 return
32         }
33         reqBody, err := ioutil.ReadAll(r.Body)
34         if err != nil {
35                 http.Error(w, err.Error(), http.StatusUnprocessableEntity)
36                 return
37         }
38         slogger.Info(string(reqBody))
39         var results utils.Pipeline
40         error := json.Unmarshal(reqBody, &results)
41         if error != nil {
42                 unableToParse := fmt.Sprintf("Could not unmarshal the JSON in create request :: %s", err.Error())
43                 fmt.Fprintln(w, unableToParse)
44                 return
45         }
46         if validateKafkaConfig(results.KafkaConfiguration) == false {
47                 http.Error(w, "Validation failed for kafka config items, check logs ..", http.StatusBadRequest)
48                 return
49         }
50         if validateHdfsConfig(results.HdfsConfiguration) == false {
51                 http.Error(w, "Validation failed for hdfs config items, check logs ..", http.StatusBadRequest)
52                 return
53         }
54         writerName := pipeline.CreatePipeline(results.KafkaConfiguration, results.HdfsConfiguration)
55         successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
56         w.Header().Set("Content-Type", "application/json")
57         w.WriteHeader(http.StatusCreated)
58         fmt.Fprintln(w, successMessage)
59 }
60
61 // deleteWriter deletes a given writer pipeline
62 func deleteWriter(w http.ResponseWriter, r *http.Request) {
63         vars := mux.Vars(r)
64         writerName := vars["writerName"]
65         if _, keyExists := pipeline.ChannelMap[writerName]; keyExists {
66                 pipeline.DeletePipeline(writerName)
67                 w.WriteHeader(http.StatusOK)
68                 deleteMessage := fmt.Sprintf("Deleted writer :: %s", writerName)
69                 fmt.Fprintln(w, deleteMessage)
70         } else {
71                 notFoundMessage := fmt.Sprintf("Could not find writer :: %s", writerName)
72                 fmt.Fprintln(w, notFoundMessage)
73
74         }
75 }
76
77 // validateKafkaConfig validates the kafka config items and returns true if they are valid.
78 func validateKafkaConfig(k utils.KafkaConfig) bool {
79         if strings.TrimSpace(k.Broker) == "" {
80                 fmt.Println("Broker is empty!")
81                 slogger.Infof("Broker is empty!")
82                 return false
83         }
84         if strings.TrimSpace(k.Group) == "" {
85                 fmt.Println("Group is empty!")
86                 slogger.Infof("Group is empty!")
87                 return false
88         }
89         if strings.TrimSpace(k.Topic) == "" {
90                 fmt.Println("Topic is empty!")
91                 slogger.Infof("Topic is empty!")
92                 return false
93         }
94         return true
95 }
96
97 // validateHdfsConfig validates the kafka config items and returns true if they are valid.
98 func validateHdfsConfig(h utils.HdfsConfig) bool {
99         if strings.TrimSpace(h.HdfsURL) == "" {
100                 fmt.Println("HdfsURL is empty!")
101                 return false
102         }
103         return true
104 }
105
106 // getAllWriters list down the active writers
107 func getAllWriters(w http.ResponseWriter, r *http.Request) {
108         slogger.Info("Listing all the writers  ...")
109         var listOfWriters []string
110         for k := range pipeline.ChannelMap {
111                 listOfWriters = append(listOfWriters, k)
112         }
113         w.Header().Set("Content-Type", "application/json")
114         w.WriteHeader(http.StatusOK)
115         w.Write([]byte(fmt.Sprintf(`{"Writers" : "%v"}`, listOfWriters)))
116 }