HDFS-WriterApp-Fixed all the code review comments
[demo.git] / vnfs / DAaaS / microservices / GoApps / src / go-hdfs-writer / pkg / handler / handler.go
index 65021b4..568cb8c 100644 (file)
 package handler
 
-
 import (
-       "fmt"
-       "net/http"
-       "io/ioutil"
        "encoding/json"
+       "fmt"
        "github.com/gorilla/mux"
+       "io/ioutil"
+       "net/http"
+       "strings"
 
-       guuid "github.com/google/uuid"
        pipeline "hdfs-writer/pkg/pipeline"
        utils "hdfs-writer/pkg/utils"
 )
 
-
 var slogger = utils.GetLoggerInstance()
-// ChannelMap is the global map to store writerNames as key and channels as values.
-var ChannelMap =make(map[string]chan struct{})
-
-
-// This is a sample test request handler
-func testFunc(w http.ResponseWriter, r *http.Request){
-       slogger.Info("Invoking testFunc ...")
-       w.WriteHeader(http.StatusOK)
-       fmt.Fprintln(w,"HTTP Test successful ")
-}
 
 // CreateRouter returns a http handler for the registered URLs
-func CreateRouter() http.Handler{
+func CreateRouter() http.Handler {
        router := mux.NewRouter().StrictSlash(true)
        slogger.Info("Created router ...")
-       router.HandleFunc("/test", testFunc).Methods("GET")
-       router.HandleFunc("/createWriter", createWriter).Methods("POST")
-       router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE")
+       router.HandleFunc("/v1/writer", createWriter).Methods("POST")
+       router.HandleFunc("/v1/writer/{writerName}", deleteWriter).Methods("DELETE")
+       router.HandleFunc("/v1/writers", getAllWriters).Methods("GET")
        return router
 }
 
-
 // CreateWriter creates a pipeline
-func createWriter(w http.ResponseWriter, r *http.Request){
-       reqBody, _ := ioutil.ReadAll(r.Body)
+func createWriter(w http.ResponseWriter, r *http.Request) {
+       if r.Body == nil {
+               http.Error(w, "Empty body", http.StatusBadRequest)
+               return
+       }
+       reqBody, err := ioutil.ReadAll(r.Body)
+       if err != nil {
+               http.Error(w, err.Error(), http.StatusUnprocessableEntity)
+               return
+       }
        slogger.Info(string(reqBody))
-       var results map[string]interface{}
-       json.Unmarshal(reqBody, &results)
-       if len(results)==0{
-               slogger.Fatalf("Unable to read from the config json file, unable to create configObject map")
+       var results utils.Pipeline
+       error := json.Unmarshal(reqBody, &results)
+       if error != nil {
+               unableToParse := fmt.Sprintf("Could not unmarshal the JSON in create request :: %s", err.Error())
+               fmt.Fprintln(w, unableToParse)
+               return
        }
-       writerStr := "writer"
-       writer := results[writerStr].(map[string]interface{})
-       kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{})
-       hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{})
-
-       kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj)
-       hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj)
-
-       //populate the channelMap
-       pipelineChan := make(chan struct{})
-       slogger.Infof("Channel created by post :: %v", pipelineChan)
-       uuid := guuid.New().String()
-       //slogger.Infof("guuid :: %s",uuid)
-       slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
-       writerName := writerStr+"-"+uuid[len(uuid)-4:]
-       slogger.Infof("::writerName:: %s ",writerName)
-       ChannelMap[writerName] = pipelineChan
-       
-       // envoke the go routine to build pipeline
-       go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName])
+       if validateKafkaConfig(results.KafkaConfiguration) == false {
+               http.Error(w, "Validation failed for kafka config items, check logs ..", http.StatusBadRequest)
+               return
+       }
+       if validateHdfsConfig(results.HdfsConfiguration) == false {
+               http.Error(w, "Validation failed for hdfs config items, check logs ..", http.StatusBadRequest)
+               return
+       }
+       writerName := pipeline.CreatePipeline(results.KafkaConfiguration, results.HdfsConfiguration)
        successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
-       w.WriteHeader(http.StatusOK)
-       fmt.Fprintln(w,successMessage)
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusCreated)
+       fmt.Fprintln(w, successMessage)
 }
 
-
 // deleteWriter deletes a given writer pipeline
-func deleteWriter(w http.ResponseWriter, r *http.Request){
+func deleteWriter(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        writerName := vars["writerName"]
-       if _, keyExists := ChannelMap[writerName]; keyExists{
-               slogger.Infof("::Writer to be closed:: %s",writerName)
-               toBeClosedChannel := ChannelMap[writerName]
-               close(toBeClosedChannel)
-               // deleting the channel from ChannelMap after closure to 
-               // avoid closing the closed channel
-               delete(ChannelMap, writerName)
-
+       if _, keyExists := pipeline.ChannelMap[writerName]; keyExists {
+               pipeline.DeletePipeline(writerName)
                w.WriteHeader(http.StatusOK)
-               deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName)
-               fmt.Fprintln(w,deleteMessage)
-               
-       }else{
-               notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName)
-               fmt.Fprintln(w,notFoundMessage)
+               deleteMessage := fmt.Sprintf("Deleted writer :: %s", writerName)
+               fmt.Fprintln(w, deleteMessage)
+       } else {
+               notFoundMessage := fmt.Sprintf("Could not find writer :: %s", writerName)
+               fmt.Fprintln(w, notFoundMessage)
+
+       }
+}
+
+// validateKafkaConfig validates the kafka config items and returns true if they are valid.
+func validateKafkaConfig(k utils.KafkaConfig) bool {
+       if strings.TrimSpace(k.Broker) == "" {
+               fmt.Println("Broker is empty!")
+               slogger.Infof("Broker is empty!")
+               return false
+       }
+       if strings.TrimSpace(k.Group) == "" {
+               fmt.Println("Group is empty!")
+               slogger.Infof("Group is empty!")
+               return false
        }
-       
-}
\ No newline at end of file
+       if strings.TrimSpace(k.Topic) == "" {
+               fmt.Println("Topic is empty!")
+               slogger.Infof("Topic is empty!")
+               return false
+       }
+       return true
+}
+
+// validateHdfsConfig validates the kafka config items and returns true if they are valid.
+func validateHdfsConfig(h utils.HdfsConfig) bool {
+       if strings.TrimSpace(h.HdfsURL) == "" {
+               fmt.Println("HdfsURL is empty!")
+               return false
+       }
+       return true
+}
+
+// getAllWriters list down the active writers
+func getAllWriters(w http.ResponseWriter, r *http.Request) {
+       slogger.Info("Listing all the writers  ...")
+       var listOfWriters []string
+       for k := range pipeline.ChannelMap {
+               listOfWriters = append(listOfWriters, k)
+       }
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       w.Write([]byte(fmt.Sprintf(`{"Writers" : "%v"}`, listOfWriters)))
+}