HDFSWriter microservice working copy 90/97090/2
authorRajamohan Raj <rajamohan.raj@intel.com>
Tue, 15 Oct 2019 00:48:18 +0000 (00:48 +0000)
committerMarco Platania <platania@research.att.com>
Tue, 15 Oct 2019 13:30:04 +0000 (13:30 +0000)
Issue-ID: ONAPARC-453
Signed-off-by: Rajamohan Raj <rajamohan.raj@intel.com>
Change-Id: I11c91b642e466763c1ca6f5734bf81fb260e2b39

25 files changed:
vnfs/DAaaS/README.md
vnfs/DAaaS/deploy/training-core/charts/m3db/values.yaml
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.mod [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.sum [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_hdfs.yaml [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_kafka.yaml [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_writer.yaml [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_deployment.yaml [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_service.yaml [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/logutils.go [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/readJson.go [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml [new file with mode: 0644]
vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/.gitignore [deleted file]
vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/Makefile [deleted file]
vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/README.md [deleted file]
vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/cmd/main.go [deleted file]
vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/go.mod [deleted file]

index 91b5817..60c237b 100644 (file)
@@ -79,7 +79,7 @@ kubectl get crds | grep rook
 ```
 If this return results like :
 ```
-otc@otconap7 /var/lib/rook $  kc get crds | grep rook
+otc@otconap7 /var/lib/rook $  kubectl get crds | grep rook
 cephblockpools.ceph.rook.io         2019-07-19T18:19:05Z
 cephclusters.ceph.rook.io           2019-07-19T18:19:05Z
 cephfilesystems.ceph.rook.io        2019-07-19T18:19:05Z
@@ -91,7 +91,7 @@ then you should delete these previously existing rook based CRDs by generating a
 manifest file by these commands and then deleting those files:
 ```
 helm template -n rook . -f values.yaml > ~/delete.yaml
-kc delete -f ~/delete.yaml
+kubectl delete -f ~/delete.yaml
 ```
 
 After this, delete the below directory in all the nodes.
@@ -124,7 +124,7 @@ IMAGE_NAME=dcr.cluster.local:32644/visualization-operator:latest
 
 ### Install the Operator Package
 ```bash
-cd $DA_WORKING_DIR/operator
+cd $DA_WORKING_DIR/deploy/operator
 helm install -n operator . -f values.yaml --namespace=operator
 ```
 Check for the status of the pods in operator namespace. Check if Prometheus operator pods are in Ready state.
@@ -157,7 +157,7 @@ Note: Collectd.conf is avaliable in $DA_WORKING_DIR/collection/charts/collectd/r
 ```bash
 Default (For custom collectd skip this section)
 =======
-cd $DA_WORKING_DIR/collection
+cd $DA_WORKING_DIR/deploy/collection
 helm install -n cp . -f values.yaml --namespace=edge1
 
 Custom Collectd
index a2f553a..d3a7bbc 100644 (file)
@@ -2,7 +2,7 @@ m3dbCluster:
   name: m3db-cluster
   image:
     repository: quay.io/m3db/m3dbnode
-    tag: latest
+    tag: v0.10.2
   replicationFactor: 3
   numberOfShards: 256
   isolationGroups:
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/Dockerfile
new file mode 100644 (file)
index 0000000..ee47671
--- /dev/null
@@ -0,0 +1,38 @@
+# Use base golang image from Docker Hub
+FROM golang:1.12.7
+
+# Download the dlv (delve) debugger for go (you can comment this out if unused)
+RUN go get -u github.com/go-delve/delve/cmd/dlv
+
+WORKDIR /src/hdfs-writer
+
+RUN mkdir /librdkafka-dir && cd /librdkafka-dir
+RUN git clone https://github.com/edenhill/librdkafka.git && \ 
+cd librdkafka && \
+./configure --prefix /usr && \
+make && \
+make install
+
+# Install dependencies in go.mod and go.sum
+COPY go.mod go.sum ./
+RUN go mod download
+
+# Copy rest of the application source code
+COPY . ./
+
+# Compile the application to /app.
+RUN go build -o /hdfs-writer -v ./cmd/hdfs-writer
+
+# If you want to use the debugger, you need to modify the entrypoint to the
+# container and point it to the "dlv debug" command:
+#   * UNCOMMENT the following ENTRYPOINT statement,
+#   * COMMENT OUT the last ENTRYPOINT statement
+# Start the "dlv debug" server on port 3000 of the container. (Note that the
+# application process will NOT start until the debugger is attached.)
+#ENTRYPOINT ["dlv", "debug", "./cmd/hdfs-writer",  "--api-version=2", "--headless", "--listen=:3001", "--log", "--log-dest=/home.dlv.log"]
+
+# If you want to run WITHOUT the debugging server:
+#   * COMMENT OUT the previous ENTRYPOINT statements,
+#   * UNCOMMENT the following ENTRYPOINT statement.
+#ENTRYPOINT ["/bin/sleep", "3600"]
+ENTRYPOINT ["/hdfs-writer"]
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/cmd/hdfs-writer/main.go
new file mode 100644 (file)
index 0000000..a79a3e0
--- /dev/null
@@ -0,0 +1,51 @@
+package main
+
+import (
+       "context"
+       "fmt"
+       "net/http"
+       "os"
+       "os/signal"
+       "time"
+
+       handler "hdfs-writer/pkg/handler"
+       utils "hdfs-writer/pkg/utils"
+)
+
+func main() {
+
+       slogger := utils.GetLoggerInstance()
+
+       // Create the server
+       httpServer := &http.Server{
+               Handler: handler.CreateRouter(),
+               Addr:    ":9393",
+       }
+
+       connectionsClose := make(chan struct{})
+       go func() {
+               c := make(chan os.Signal, 1)
+               signal.Notify(c, os.Interrupt)
+               <-c // function literal waiting to receive Interrupt signal
+               fmt.Printf(":::Got the kill signal:::")
+               slogger.Info(":::Got the kill signal:::")
+               for eachWriter, eachChannel := range handler.ChannelMap {
+                       slogger.Infof("Closing writer goroutine :: %s", eachWriter)
+                       slogger.Infof("eachChannel:: %v", eachChannel)
+                       close(eachChannel)
+                       // This wait time ensures that the each of the channel is killed before
+                       // main routine finishes.
+                       time.Sleep(time.Second * 5)
+               }
+               //once all goroutines are signalled, send close to main thread
+               httpServer.Shutdown(context.Background())
+               close(connectionsClose)
+       }()
+
+       // Sever starts listening
+       err := httpServer.ListenAndServe()
+       if err != nil && err != http.ErrServerClosed {
+               slogger.Fatal(err)
+       }
+       <-connectionsClose //main thread waiting to receive close signal
+}
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.mod b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.mod
new file mode 100644 (file)
index 0000000..75df203
--- /dev/null
@@ -0,0 +1,17 @@
+module hdfs-writer
+
+go 1.12
+
+require (
+       github.com/colinmarc/hdfs v1.1.3
+       github.com/confluentinc/confluent-kafka-go v1.1.0
+       github.com/golang/protobuf v1.3.2 // indirect
+       github.com/google/uuid v1.1.1
+       github.com/gorilla/mux v1.7.3
+       github.com/pkg/errors v0.8.1 // indirect
+       github.com/stretchr/testify v1.3.0 // indirect
+       go.uber.org/atomic v1.4.0 // indirect
+       go.uber.org/multierr v1.1.0 // indirect
+       go.uber.org/zap v1.10.0
+       gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0
+)
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.sum b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/go.sum
new file mode 100644 (file)
index 0000000..53cc1db
--- /dev/null
@@ -0,0 +1,27 @@
+github.com/colinmarc/hdfs v1.1.3 h1:662salalXLFmp+ctD+x0aG+xOg62lnVnOJHksXYpFBw=
+github.com/colinmarc/hdfs v1.1.3/go.mod h1:0DumPviB681UcSuJErAbDIOx6SIaJWj463TymfZG02I=
+github.com/confluentinc/confluent-kafka-go v1.1.0 h1:HIW7Nkm8IeKRotC34mGY06DwQMf9Mp9PZMyqDxid2wI=
+github.com/confluentinc/confluent-kafka-go v1.1.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
+github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
+github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
+go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
+go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
+go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
+go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
+gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0 h1:roy97m/3wj9/o8OuU3sZ5wildk30ep38k2x8nhNbKrI=
+gopkg.in/confluentinc/confluent-kafka-go.v1 v1.1.0/go.mod h1:ZdI3yfYmdNSLQPNCpO1y00EHyWaHG5EnQEyL/ntAegY=
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_hdfs.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_hdfs.yaml
new file mode 100644 (file)
index 0000000..1fdd7ca
--- /dev/null
@@ -0,0 +1,10 @@
+# Preserving a sample config of hdfs
+#NOTE : The kafka config shall come through the REST request as part of writer config
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: configmap-hdfs
+data:
+  #hdfs_url: hdfs1-namenode-1.hdfs1-namenode.hdfs1:8020
+  hdfs_url: hdfs1-namenode:8020
+  
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_kafka.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_kafka.yaml
new file mode 100644 (file)
index 0000000..503896b
--- /dev/null
@@ -0,0 +1,11 @@
+# Preserving a sample config of kafka broker
+#NOTE : The kafka config shall come through the REST request as part of writer config
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: configmap-kafka
+data:
+  #broker: kafka-cluster-kafka-1.kafka-cluster-kafka-brokers.hdfs1.svc.cluster.local:9092
+  broker: kafka-cluster-kafka-bootstrap:9092
+  group: grp1i
+  topic: newTopc9
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_writer.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/configmap_writer.yaml
new file mode 100644 (file)
index 0000000..f5d0833
--- /dev/null
@@ -0,0 +1,22 @@
+#Preserving a sample config of writer. 
+#NOTE : The writer config shall come through the REST request
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: config-json
+data:
+  config.json: |-
+    [
+    {"writer": {
+            "kafkaConfig": {
+                "broker": "kafka-cluster-kafka-bootstrap:9092",
+                "group": "grp1",
+                "topic": "newTopic9"
+            },
+            "hdfsConfig": {
+                "hdfs_url": "hdfs1-namenode:8020"
+            }
+        }
+    }
+    ]
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_deployment.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_deployment.yaml
new file mode 100644 (file)
index 0000000..393a1d7
--- /dev/null
@@ -0,0 +1,63 @@
+# This Deployment manifest defines:
+# - single-replica deployment of the container image, with label "app: go-hello-world"
+# - Pod exposes port 8080
+# - specify PORT environment variable to the container process
+# Syntax reference https://kubernetes.io/docs/concepts/configuration/overview/
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: go-hdfs-writer
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: hdfs-writer
+  template:
+    metadata:
+      labels:
+        app: hdfs-writer
+    spec:
+      containers:
+      - name: server
+        image: hdfs-writer
+        volumeMounts:
+          - name: config-volume
+            mountPath: src/hdfs-writer/cmd/hdfs-writer/config.json
+            subPath: config.json
+        ports:
+        - containerPort: 8080
+        env:
+        - name: PORT
+          value: "8080"
+        - name: BROKER
+          valueFrom:
+            configMapKeyRef:  
+              name: configmap-kafka
+              key: broker
+        - name: GROUP
+          valueFrom:
+            configMapKeyRef:  
+              name: configmap-kafka
+              key: group
+        - name: TOPIC
+          valueFrom:
+            configMapKeyRef:  
+              name: configmap-kafka
+              key: topic
+        - name: HDFS_URL
+          valueFrom:
+            configMapKeyRef:  
+              name: configmap-hdfs
+              key: hdfs_url
+        resources:
+          requests:
+            memory: "640Mi"
+            cpu: "2500m"
+          limits:
+            memory: "1280Mi"
+            cpu: "5000m"
+      volumes: 
+        - name: config-volume
+          configMap:
+            name: config-json
+      terminationGracePeriodSeconds: 3
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_service.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/kubernetes-manifests/hdfs_writer_service.yaml
new file mode 100644 (file)
index 0000000..596ad5f
--- /dev/null
@@ -0,0 +1,14 @@
+# This is required for testing using the POSTMAN
+kind: Service
+apiVersion: v1
+metadata:
+  name: hdfs-writer-svc
+spec:
+  type: NodePort
+  selector:
+    app: hdfs-writer
+  ports:
+    - nodePort: 30303
+      port: 9393
+      targetPort: 9393
+      
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/handler/handler.go
new file mode 100644 (file)
index 0000000..65021b4
--- /dev/null
@@ -0,0 +1,96 @@
+package handler
+
+
+import (
+       "fmt"
+       "net/http"
+       "io/ioutil"
+       "encoding/json"
+       "github.com/gorilla/mux"
+
+       guuid "github.com/google/uuid"
+       pipeline "hdfs-writer/pkg/pipeline"
+       utils "hdfs-writer/pkg/utils"
+)
+
+
+var slogger = utils.GetLoggerInstance()
+// ChannelMap is the global map to store writerNames as key and channels as values.
+var ChannelMap =make(map[string]chan struct{})
+
+
+// This is a sample test request handler
+func testFunc(w http.ResponseWriter, r *http.Request){
+       slogger.Info("Invoking testFunc ...")
+       w.WriteHeader(http.StatusOK)
+       fmt.Fprintln(w,"HTTP Test successful ")
+}
+
+// CreateRouter returns a http handler for the registered URLs
+func CreateRouter() http.Handler{
+       router := mux.NewRouter().StrictSlash(true)
+       slogger.Info("Created router ...")
+       router.HandleFunc("/test", testFunc).Methods("GET")
+       router.HandleFunc("/createWriter", createWriter).Methods("POST")
+       router.HandleFunc("/deleteWriter/{writerName}", deleteWriter).Methods("DELETE")
+       return router
+}
+
+
+// CreateWriter creates a pipeline
+func createWriter(w http.ResponseWriter, r *http.Request){
+       reqBody, _ := ioutil.ReadAll(r.Body)
+       slogger.Info(string(reqBody))
+       var results map[string]interface{}
+       json.Unmarshal(reqBody, &results)
+       if len(results)==0{
+               slogger.Fatalf("Unable to read from the config json file, unable to create configObject map")
+       }
+       writerStr := "writer"
+       writer := results[writerStr].(map[string]interface{})
+       kafkaConfigMapObj := writer["kafkaConfig"].(map[string]interface{})
+       hdfsConfigObj := writer["hdfsConfig"].(map[string]interface{})
+
+       kc := utils.SetKafkaParametersByObjectMap(kafkaConfigMapObj)
+       hc := utils.SetHdfsParametersByObjectMap(hdfsConfigObj)
+
+       //populate the channelMap
+       pipelineChan := make(chan struct{})
+       slogger.Infof("Channel created by post :: %v", pipelineChan)
+       uuid := guuid.New().String()
+       //slogger.Infof("guuid :: %s",uuid)
+       slogger.Infof(":: Storing writerName and channel in ChannelMap :: ")
+       writerName := writerStr+"-"+uuid[len(uuid)-4:]
+       slogger.Infof("::writerName:: %s ",writerName)
+       ChannelMap[writerName] = pipelineChan
+       
+       // envoke the go routine to build pipeline
+       go pipeline.BuildWriterPipeline(kc,hc, writerName, ChannelMap[writerName])
+       successMessage := fmt.Sprintf("Created the writer ::%s", writerName)
+       w.WriteHeader(http.StatusOK)
+       fmt.Fprintln(w,successMessage)
+}
+
+
+// deleteWriter deletes a given writer pipeline
+func deleteWriter(w http.ResponseWriter, r *http.Request){
+       vars := mux.Vars(r)
+       writerName := vars["writerName"]
+       if _, keyExists := ChannelMap[writerName]; keyExists{
+               slogger.Infof("::Writer to be closed:: %s",writerName)
+               toBeClosedChannel := ChannelMap[writerName]
+               close(toBeClosedChannel)
+               // deleting the channel from ChannelMap after closure to 
+               // avoid closing the closed channel
+               delete(ChannelMap, writerName)
+
+               w.WriteHeader(http.StatusOK)
+               deleteMessage := fmt.Sprintf("Deleted writer :: %s",writerName)
+               fmt.Fprintln(w,deleteMessage)
+               
+       }else{
+               notFoundMessage := fmt.Sprintf("Could not find writer :: %s",writerName)
+               fmt.Fprintln(w,notFoundMessage)
+       }
+       
+}
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/pipeline/pipeline.go
new file mode 100644 (file)
index 0000000..c5dbd3c
--- /dev/null
@@ -0,0 +1,130 @@
+package pipeline
+
+import (
+       "fmt"
+       "os"
+       "github.com/colinmarc/hdfs"
+       "github.com/confluentinc/confluent-kafka-go/kafka"
+       utils "hdfs-writer/pkg/utils"
+       
+)
+
+// BuildWriterPipeline builds a pipeline
+func BuildWriterPipeline(k utils.KafkaConfig, h utils.HdfsConfig, writerName string, sigchan chan struct{}) {
+       slogger := utils.GetLoggerInstance()
+       topics := make([]string, 1)
+       topics[0] = k.GetTopic()
+       
+       c,err := kafka.NewConsumer(&kafka.ConfigMap{
+               "bootstrap.servers": k.GetBroker(),
+               "broker.address.family": "v4",
+               "group.id":              k.GetGroup(),
+               "session.timeout.ms":    6000,
+               "auto.offset.reset":     "earliest"})
+
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
+               os.Exit(1)
+       }
+       fmt.Printf("Created Consumer %v\n", c)
+       err = c.SubscribeTopics(topics, nil)
+
+       run := true
+       setUpPipeline := false
+
+       var hdfsFileWriter *hdfs.FileWriter
+       var hdfsFileWriterError error
+       // HDFS CLIENT CREATION
+       //client := utils.GetHdfsClientInstance(h.GetHdfsURL())
+       client := utils.CreateHdfsClient(h.GetHdfsURL())
+       
+
+       for run==true {
+               select {
+               case sig := <-sigchan:
+                       client.Close()
+                       if hdfsFileWriter!=nil{
+                               cleanup(hdfsFileWriter)
+                       }
+                       slogger.Infof("\nCaught signal %v: terminating the go-routine of writer :: %s\n", sig, writerName)
+                       run = false
+               default:
+                       //slogger.Info("Running default option ....")
+                       ev := c.Poll(100)
+                       if ev == nil {
+                               continue
+                       }
+                       //:: BEGIN : Switch between different types of messages that come out of kafka
+                       switch e := ev.(type){
+                       case *kafka.Message:
+                               slogger.Infof("::: Message on %s\n%s\n", e.TopicPartition, e.Value)
+                               dataStr := string(e.Value)
+                               slogger.Infof("byte array ::: %s", []byte(dataStr))
+                               fileInfo, fileInfoError := client.Stat("/" + k.GetTopic())
+                               // create file if it doesnt exists already
+                               if fileInfoError != nil {
+                                       slogger.Infof("Error::: %s",fileInfoError)
+                                       slogger.Infof("Creating file::: %s", "/"+k.GetTopic())
+                                       hdfsFileWriterError = client.CreateEmptyFile("/"+k.GetTopic())
+                                       if hdfsFileWriterError !=nil {
+                                               slogger.Infof("Creation of empty file ::: %s failed\n Error:: %s",
+                                                "/"+k.GetTopic(), hdfsFileWriterError.Error())
+                                               panic(fmt.Sprintf("Creation of empty file ::: %s failed", k.GetTopic()))
+                                       }
+                                       _= client.Chmod("/"+k.GetTopic(), 0777);
+                               }
+                               newDataStr := dataStr + "\n"
+                               // file exists case, so just append
+                               hdfsFileWriter, hdfsFileWriterError = client.Append("/"+fileInfo.Name())
+                               
+                               if hdfsFileWriterError != nil || hdfsFileWriter==nil{
+                                       if(hdfsFileWriter==nil){
+                                               slogger.Infof("hdfsFileWriter is NULL !!")
+                                       }
+                                       slogger.Infof(":::Appending to file : %s failed:::\nError occured:::%s\n",
+                                        "/"+k.GetTopic(),hdfsFileWriterError)
+                                       panic(fmt.Sprintf("Appending to file : %s failed", k.GetTopic()))
+                               }
+                               bytesWritten, error := hdfsFileWriter.Write([]byte(newDataStr))
+                               if bytesWritten > 0 && error == nil {
+                                       slogger.Infof("::: Wrote %s to HDFS:::", newDataStr)
+                                       slogger.Infof("::: Wrote %d bytes to HDFS:::", bytesWritten)
+                                       
+                                       if setUpPipeline==false{
+                                               slogger.Infof(" The pipeline with topic: %s and hdfs url %s is setup,"+
+                                               "watching for more messages.. ",k.GetTopic(), h.GetHdfsURL())
+                                               setUpPipeline = true
+                                       }
+                               } else {
+                                       slogger.Info("::: Unable to write to HDFS\n :::Error:: %s",error)
+                               }
+                               hdfsFileWriter.Close()
+                       
+                       case kafka.Error:
+                               // Errors should generally be considered
+                               // informational, the client will try to
+                               // automatically recover.
+                               // But in this example we choose to terminate
+                               // the application if all brokers are down.
+                               fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+                               if e.Code() == kafka.ErrAllBrokersDown {
+                                       run = false
+                               }
+                       
+                       default:
+                               fmt.Printf("Ignored %v\n", e)
+                       } //:: END : Switch between different types of messages that come out of kafka
+               } // END: select channel
+       } // END : infinite loop
+
+       fmt.Printf("Closing the consumer")
+}
+
+func cleanup(h *hdfs.FileWriter){
+       if h!=nil{
+               err := h.Close()
+               if err!=nil{
+                       fmt.Printf(":::Error occured while closing the hdfs writer::: \n%s", err.Error())
+               }
+       }
+}
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/sample-rest-requests/createWriter.json
new file mode 100644 (file)
index 0000000..9a41d91
--- /dev/null
@@ -0,0 +1,11 @@
+{"writer": {
+    "kafkaConfig": {
+        "broker": "kafka-cluster-kafka-bootstrap:9092",
+        "group": "grp1",
+        "topic": "newTopic9"
+    },
+    "hdfsConfig": {
+        "hdfs_url": "hdfs1-namenode:8020"
+    }
+}
+}
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfs-config.go
new file mode 100644 (file)
index 0000000..ac33bc6
--- /dev/null
@@ -0,0 +1,37 @@
+package utils
+
+import (
+       "os"
+)
+
+// SetHdfsParametersByObjectMap set the value of the hdfs config parameters
+// and return HdfsConfig object
+func SetHdfsParametersByObjectMap(m map[string]interface{}) HdfsConfig{
+
+       hc := HdfsConfig{}
+       hc.hdfsURL = m["hdfs_url"].(string)
+       return hc
+
+}
+
+// SetHdfsParametersByEnvVariables sets the hdfs parameters
+func SetHdfsParametersByEnvVariables() HdfsConfig {
+       
+       slogger := GetLoggerInstance()
+       hdfsConfigObject := HdfsConfig{
+               hdfsURL: os.Getenv("HDFS_URL"),
+       }
+       slogger.Infof("::hdfsURL:: %s", hdfsConfigObject.hdfsURL)
+       return hdfsConfigObject
+       
+}
+
+// HdfsConfig contains hdfs related config items
+type HdfsConfig struct {
+       hdfsURL string
+}
+
+// GetHdfsURL returns HdfsURL
+func (h HdfsConfig) GetHdfsURL() string {
+       return h.hdfsURL
+}
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/hdfsUtils.go
new file mode 100644 (file)
index 0000000..1a93a5a
--- /dev/null
@@ -0,0 +1,33 @@
+package utils
+
+import (
+       "fmt"
+       "github.com/colinmarc/hdfs"
+       //"sync"
+       //"go.uber.org/zap"
+)
+
+
+//var clientOnce sync.Once
+//var hdfsClient *hdfs.Client
+//var slogger *zap.SugaredLogger
+
+
+//GetHdfsClientInstance returns a singleton hdfsClient instance
+// func GetHdfsClientInstance(hdfsURL string) (*hdfs.Client){
+//     clientOnce.Do(func(){
+//             hdfsClient = createHdfsClient(hdfsURL)
+//     })
+//     return hdfsClient
+// }
+
+//CreateHdfsClient creates a hdfs client and returns hdfs client
+func CreateHdfsClient(hdfsURL string) (*hdfs.Client){
+       slogger := GetLoggerInstance()
+       hdfsClient, hdfsConnectError := hdfs.New(hdfsURL)
+       if hdfsConnectError !=nil {
+               slogger.Fatalf(":::Error in create hdfsClient::: %v", hdfsConnectError)
+               fmt.Printf("::Unable to initialize hdfsURL, check logs")
+       }
+       return hdfsClient
+}
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/kafka-config.go
new file mode 100644 (file)
index 0000000..080bfd4
--- /dev/null
@@ -0,0 +1,55 @@
+package utils
+
+
+import (
+       "os"
+)
+
+// SetKafkaParametersByObjectMap sets the  value of the kafka parameters
+// and sets the KafkaConfig object 
+func SetKafkaParametersByObjectMap(m map[string]interface{}) KafkaConfig {
+       kc := KafkaConfig{}
+       kc.broker = m["broker"].(string)
+       kc.group = m["group"].(string)
+       kc.topic = m["topic"].(string)
+
+       return kc
+}
+
+// SetKafkaParametersByEnvVariables sets the kafka parameters
+func SetKafkaParametersByEnvVariables() KafkaConfig {
+       slogger := GetLoggerInstance()
+       
+       kafkaConfigObject := KafkaConfig{
+               broker: os.Getenv("BROKER"),
+               group: os.Getenv("GROUP"),
+               topic: os.Getenv("TOPIC"),
+       }
+       slogger.Infof("::broker:: %s", kafkaConfigObject.broker)
+       slogger.Infof("::group:: %s", kafkaConfigObject.group)
+       slogger.Infof("::topic:: %s", kafkaConfigObject.topic)
+
+       return kafkaConfigObject
+}
+
+// KafkaConfig contains all the config parameters needed for kafka. This can be extended over time
+type KafkaConfig struct {
+       broker string
+       group string
+       topic string
+}
+
+// GetBroker returns kafka broker configured
+func (k KafkaConfig) GetBroker() string {
+       return k.broker
+}
+
+// GetGroup returns kafka group configured
+func (k KafkaConfig) GetGroup() string {
+       return k.group
+}
+
+// GetTopic returns kafka topic configured
+func (k KafkaConfig) GetTopic() string {
+       return k.topic
+}
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/logutils.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/logutils.go
new file mode 100644 (file)
index 0000000..0f72e71
--- /dev/null
@@ -0,0 +1,32 @@
+package utils
+
+import (
+       "go.uber.org/zap"
+       "fmt"
+       "sync"
+)
+
+
+
+var logOnce sync.Once
+var logger *zap.SugaredLogger
+
+//GetLoggerInstance returns a singleton instance of logger
+func GetLoggerInstance() (*zap.SugaredLogger){
+       logOnce.Do(func(){
+               logger = createLogger()
+       })
+       return logger
+}
+
+
+//createLogger returns a SugaredLogger, sugaredLogger can be directly used to generate logs
+func createLogger() (*zap.SugaredLogger){
+       logger, err := zap.NewDevelopment()
+       if err != nil {
+               fmt.Printf("can't initialize zap logger: %v", err)
+       }
+       defer logger.Sync()
+       slogger := logger.Sugar()
+       return slogger
+}
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/readJson.go b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/pkg/utils/readJson.go
new file mode 100644 (file)
index 0000000..bfab64e
--- /dev/null
@@ -0,0 +1,28 @@
+package utils
+
+import (
+       "os"
+       "io/ioutil"
+)
+
+
+//ReadJSON reads the content of a give file and returns as a string
+// used for small config files only.
+func ReadJSON(path string) string {
+       slogger := GetLoggerInstance()
+       jsonFile, err := os.Open(path)
+       if err!=nil{
+               //fmt.Print(err)
+               slogger.Errorf("Unable to open file: %s", path)
+               slogger.Errorf("Error::: %s", err)
+
+       }else{
+               slogger.Infof("Successfully opened config.json")
+       }
+       
+       defer jsonFile.Close()
+       byteValue, _ := ioutil.ReadAll(jsonFile)
+       s := string(byteValue)
+       return s
+}
+
diff --git a/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml b/vnfs/DAaaS/microservices/GoApps/src/go-hdfs-writer/skaffold.yaml
new file mode 100644 (file)
index 0000000..c207967
--- /dev/null
@@ -0,0 +1,19 @@
+apiVersion: skaffold/v1beta6
+kind: Config
+build:
+  tagPolicy:
+    sha256: {}
+  artifacts:
+    - context: .
+      image: hdfs-writer
+  local:
+    useBuildkit: false
+    useDockerCLI: false
+deploy:
+  kubectl:
+    manifests:
+      - kubernetes-manifests/**
+profiles:
+  - name: cloudbuild
+    build:
+      googleCloudBuild: {}
diff --git a/vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/.gitignore b/vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/.gitignore
deleted file mode 100644 (file)
index ad4781d..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-# Common
-.DS_Store
-.vscode
-*-workspace
-.tox/
-.*.swp
-*.log
-coverage.html
-docs/build
-.Makefile.bk
-
-# Directories
-pkg
-bin
-target
-src/github.com
-src/golang.org
-src/k8splugin/vendor
-src/k8splugin/.vendor-new/
-src/k8splugin/kubeconfig/*
-deployments/k8plugin
-
-# Binaries
-*.so
-src/k8splugin/csar/mock_plugins/*.so
-src/k8splugin/plugins/**/*.so
-
-# Tests
-*.test
-*.out
-
-# KRD
-.vagrant/
-kud/hosting_providers/vagrant/inventory/hosts.ini
-kud/hosting_providers/vagrant/inventory/artifacts
-kud/hosting_providers/vagrant/inventory/group_vars/all.yml
-kud/hosting_providers/vagrant/config/pdf.yml
-kud/hosting_providers/vagrant/sources.list
-*.retry
-*.vdi
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/Makefile b/vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/Makefile
deleted file mode 100644 (file)
index e3f2140..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-# SPDX-license-identifier: Apache-2.0
-##############################################################################
-# Copyright (c) 2019 Intel Corporation
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-
-GOPATH := $(shell realpath "$(PWD)/../../")
-#export GOPATH=/Users/rajamoha/projects/demo/vnfs/DAaaS/GoApps
-export GOPATH ...
-export GO111MODULE=on
-
-
-
-all:
-       echo $$GOPATH
-       GOOS=linux GOARCH=amd64
-       @go build -tags netgo -o ./bin/hdfs-writer ./cmd/main.go
-
-build:
-       echo $$GOPATH
-       GOOS=linux GOARCH=amd64
-       @go build -tags netgo -o ./bin/hdfs-writer ./cmd/main.go
-
-.PHONY: format
-format:
-       @go fmt ./...
-
-.PHONY: clean
-clean:
-       @find . -name "*so" -delete
-       @rm -f ./bin/hdfs-writer
diff --git a/vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/README.md b/vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/README.md
deleted file mode 100644 (file)
index 453b842..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-<!-- Copyright 2019 Intel Corporation.
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-    http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License. -->
-
-# HDFS-writer
-
-Read a topic in kafka and write into HDFS directory.
\ No newline at end of file
diff --git a/vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/cmd/main.go b/vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/cmd/main.go
deleted file mode 100644 (file)
index 11350f0..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-package main
-
-import (
-       "os"
-       "fmt"
-       "log"
-       "github.com/colinmarc/hdfs/v2"
-)
-
-func main() {
-       log.Println("Starting the HDFS writer")
-       localSourceFile := os.Args[1]
-       hdfsDestination := os.Args[2]
-
-       log.Println("localSourceFile:: "+localSourceFile)
-       log.Println("hdfsDestination:: "+hdfsDestination)
-
-       client, _ := hdfs.New("hdfs://hdfs-1-namenode-1.hdfs-1-namenode.hdfs1.svc.cluster.local:8020")
-       file, _ := client.Open("/kafka.txt")
-
-       buf := make([]byte, 59)
-       file.ReadAt(buf, 48847)
-       fmt.Println(string(buf))
-
-}
diff --git a/vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/go.mod b/vnfs/DAaaS/microservices/GoApps/src/hdfs-writer/go.mod
deleted file mode 100644 (file)
index b285512..0000000
+++ /dev/null
@@ -1,3 +0,0 @@
-module hdfs-writer
-
-require github.com/colinmarc/hdfs/v2 v2.0.0 // indirect