Implementation of status notification mechanism 55/127455/2 0.10.0
authorLukasz Rajewski <lukasz.rajewski@orange.com>
Tue, 15 Feb 2022 21:39:37 +0000 (22:39 +0100)
committerLukasz Rajewski <lukasz.rajewski@orange.com>
Wed, 2 Mar 2022 21:46:03 +0000 (22:46 +0100)
- Subscription CRUD endpoints
- Subscription notifu executor
- Cleanup of subscriptions on instance delete
- Sending notification to the specified callback

Issue-ID: MULTICLOUD-1445
Signed-off-by: Lukasz Rajewski <lukasz.rajewski@orange.com>
Change-Id: I5b867a348e916f6c2c471bcc5326c831d832f45e

12 files changed:
src/k8splugin/api/api.go
src/k8splugin/api/brokerhandler_test.go
src/k8splugin/api/defhandler_test.go
src/k8splugin/api/healthcheckhandler_test.go
src/k8splugin/api/instancehandler.go
src/k8splugin/api/instancehandler_test.go
src/k8splugin/api/profilehandler_test.go
src/k8splugin/api/statussubhandler.go [new file with mode: 0644]
src/k8splugin/cmd/main.go
src/k8splugin/internal/app/client.go
src/k8splugin/internal/app/instance.go
src/k8splugin/internal/app/subscription.go [new file with mode: 0644]

index 64c83e0..64959f5 100644 (file)
@@ -1,7 +1,7 @@
 /*
 Copyright 2018 Intel Corporation.
 Copyright © 2021 Samsung Electronics
-Copyright © 2021 Orange
+Copyright © 2022 Orange
 
 Licensed under the Apache License, Version 2.0 (the "License");
 you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@ func NewRouter(defClient rb.DefinitionManager,
        configClient app.ConfigManager,
        connectionClient connection.ConnectionManager,
        templateClient rb.ConfigTemplateManager,
+       subscriptionClient app.InstanceStatusSubManager,
        healthcheckClient healthcheck.InstanceHCManager) *mux.Router {
 
        router := mux.NewRouter()
@@ -52,7 +53,6 @@ func NewRouter(defClient rb.DefinitionManager,
                        "profile-name", "{profile-name}").Methods("GET")
        //Want to get full Data -> add query param: /install/{instID}?full=true
        instRouter.HandleFunc("/instance/{instID}", instHandler.getHandler).Methods("GET")
-       instRouter.HandleFunc("/instance/{instID}/status", instHandler.statusHandler).Methods("GET")
        instRouter.HandleFunc("/instance/{instID}/upgrade", instHandler.upgradeHandler).Methods("POST")
        instRouter.HandleFunc("/instance/{instID}/query", instHandler.queryHandler).Methods("GET")
        instRouter.HandleFunc("/instance/{instID}/query", instHandler.queryHandler).
@@ -62,6 +62,19 @@ func NewRouter(defClient rb.DefinitionManager,
                        "Labels", "{Labels}").Methods("GET")
        instRouter.HandleFunc("/instance/{instID}", instHandler.deleteHandler).Methods("DELETE")
 
+       // Status handler routes
+       if subscriptionClient == nil {
+               subscriptionClient = app.NewInstanceStatusSubClient()
+               subscriptionClient.RestoreWatchers()
+       }
+       instanceStatusSubHandler := instanceStatusSubHandler{client: subscriptionClient}
+       instRouter.HandleFunc("/instance/{instID}/status", instHandler.statusHandler).Methods("GET")
+       instRouter.HandleFunc("/instance/{instID}/status/subscription", instanceStatusSubHandler.listHandler).Methods("GET")
+       instRouter.HandleFunc("/instance/{instID}/status/subscription", instanceStatusSubHandler.createHandler).Methods("POST")
+       instRouter.HandleFunc("/instance/{instID}/status/subscription/{subID}", instanceStatusSubHandler.getHandler).Methods("GET")
+       instRouter.HandleFunc("/instance/{instID}/status/subscription/{subID}", instanceStatusSubHandler.updateHandler).Methods("PUT")
+       instRouter.HandleFunc("/instance/{instID}/status/subscription/{subID}", instanceStatusSubHandler.deleteHandler).Methods("DELETE")
+
        // Query handler routes
        if queryClient == nil {
                queryClient = app.NewQueryClient()
index 767cae1..a08b8a9 100644 (file)
@@ -313,7 +313,7 @@ func TestBrokerCreateHandler(t *testing.T) {
                t.Run(testCase.label, func(t *testing.T) {
 
                        request := httptest.NewRequest("POST", "/cloudowner/cloudregion/infra_workload", testCase.input)
-                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil))
                        defer resp.Body.Close()
 
                        if testCase.expectedCode != resp.StatusCode {
@@ -409,7 +409,7 @@ func TestBrokerGetHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("GET", "/cloudowner/cloudregion/infra_workload/"+testCase.input, nil)
-                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil))
 
                        if testCase.expectedCode != resp.StatusCode {
                                t.Fatalf("Request method returned: %v and it was expected: %v",
@@ -489,7 +489,7 @@ func TestBrokerFindHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("GET", "/cloudowner/cloudregion/infra_workload?name="+testCase.input, nil)
-                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil))
 
                        if testCase.expectedCode != resp.StatusCode {
                                t.Fatalf("Request method returned: %v and it was expected: %v",
@@ -551,7 +551,7 @@ func TestBrokerDeleteHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("DELETE", "/cloudowner/cloudregion/infra_workload/"+testCase.input, nil)
-                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil))
 
                        if testCase.expectedCode != resp.StatusCode {
                                t.Fatalf("Request method returned: %v and it was expected: %v", resp.StatusCode, testCase.expectedCode)
index b626b6f..22d4548 100644 (file)
@@ -139,7 +139,7 @@ func TestRBDefCreateHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("POST", "/v1/rb/definition", testCase.reader)
-                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil))
 
                        //Check returned code
                        if resp.StatusCode != testCase.expectedCode {
@@ -208,7 +208,7 @@ func TestRBDefListVersionsHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("GET", "/v1/rb/definition/testresourcebundle", nil)
-                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil))
 
                        //Check returned code
                        if resp.StatusCode != testCase.expectedCode {
@@ -288,7 +288,7 @@ func TestRBDefListAllHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("GET", "/v1/rb/definition", nil)
-                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil))
 
                        //Check returned code
                        if resp.StatusCode != testCase.expectedCode {
@@ -368,7 +368,7 @@ func TestRBDefGetHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("GET", "/v1/rb/definition/"+testCase.name+"/"+testCase.version, nil)
-                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil))
 
                        //Check returned code
                        if resp.StatusCode != testCase.expectedCode {
@@ -419,7 +419,7 @@ func TestRBDefDeleteHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("DELETE", "/v1/rb/definition/"+testCase.name+"/"+testCase.version, nil)
-                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil))
 
                        //Check returned code
                        if resp.StatusCode != testCase.expectedCode {
@@ -476,7 +476,7 @@ func TestRBDefUploadHandler(t *testing.T) {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("POST",
                                "/v1/rb/definition/"+testCase.name+"/"+testCase.version+"/content", testCase.body)
-                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil))
 
                        //Check returned code
                        if resp.StatusCode != testCase.expectedCode {
index 3a03d90..c6c07c1 100644 (file)
@@ -35,7 +35,7 @@ func TestHealthCheckHandler(t *testing.T) {
                        Err: nil,
                }
                request := httptest.NewRequest("GET", "/v1/healthcheck", nil)
-               resp := executeRequest(request, NewRouter(nil, nil, nil, nil, nil, nil, nil, nil))
+               resp := executeRequest(request, NewRouter(nil, nil, nil, nil, nil, nil, nil, nil, nil))
 
                //Check returned code
                if resp.StatusCode != http.StatusOK {
@@ -48,7 +48,7 @@ func TestHealthCheckHandler(t *testing.T) {
                        Err: pkgerrors.New("Runtime Error in DB"),
                }
                request := httptest.NewRequest("GET", "/v1/healthcheck", nil)
-               resp := executeRequest(request, NewRouter(nil, nil, nil, nil, nil, nil, nil, nil))
+               resp := executeRequest(request, NewRouter(nil, nil, nil, nil, nil, nil, nil, nil, nil))
 
                //Check returned code
                if resp.StatusCode != http.StatusInternalServerError {
index 6d1fd7b..dd5fd0d 100644 (file)
@@ -211,7 +211,7 @@ func (i instanceHandler) statusHandler(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        id := vars["instID"]
 
-       resp, err := i.client.Status(id)
+       resp, err := i.client.Status(id, true)
        if err != nil {
                log.Error("Error getting Status", log.Fields{
                        "error": err,
index 444b669..f06af44 100644 (file)
@@ -72,7 +72,7 @@ func (m *mockInstanceClient) Query(id, apiVersion, kind, name, labels string) (a
        return m.statusItem, nil
 }
 
-func (m *mockInstanceClient) Status(id string) (app.InstanceStatus, error) {
+func (m *mockInstanceClient) Status(id string, checkReady bool) (app.InstanceStatus, error) {
        if m.err != nil {
                return app.InstanceStatus{}, m.err
        }
@@ -205,7 +205,7 @@ func TestInstanceCreateHandler(t *testing.T) {
                t.Run(testCase.label, func(t *testing.T) {
 
                        request := httptest.NewRequest("POST", "/v1/instance", testCase.input)
-                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil))
 
                        if testCase.expectedCode != resp.StatusCode {
                                body, _ := ioutil.ReadAll(resp.Body)
@@ -306,7 +306,7 @@ func TestInstanceGetHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("GET", "/v1/instance/"+testCase.input, nil)
-                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil))
 
                        if testCase.expectedCode != resp.StatusCode {
                                t.Fatalf("Request method returned: %v and it was expected: %v",
@@ -441,7 +441,7 @@ func TestInstanceListHandler(t *testing.T) {
                                }
                                request.URL.RawQuery = q.Encode()
                        }
-                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil))
 
                        if testCase.expectedCode != resp.StatusCode {
                                t.Fatalf("Request method returned: %v and it was expected: %v",
@@ -500,7 +500,7 @@ func TestDeleteHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("DELETE", "/v1/instance/"+testCase.input, nil)
-                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil))
 
                        if testCase.expectedCode != resp.StatusCode {
                                t.Fatalf("Request method returned: %v and it was expected: %v", resp.StatusCode, testCase.expectedCode)
@@ -734,7 +734,7 @@ func TestInstanceQueryHandler(t *testing.T) {
                        }
                        url := "/v1/instance/" + testCase.id + "/query?" + params.Encode()
                        request := httptest.NewRequest("GET", url, nil)
-                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil))
 
                        if testCase.expectedCode != resp.StatusCode {
                                body, _ := ioutil.ReadAll(resp.Body)
index 181b775..5704066 100644 (file)
@@ -127,7 +127,7 @@ func TestRBProfileCreateHandler(t *testing.T) {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("POST", "/v1/rb/definition/test-rbdef/v1/profile",
                                testCase.reader)
-                       resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil, nil))
 
                        //Check returned code
                        if resp.StatusCode != testCase.expectedCode {
@@ -207,7 +207,7 @@ func TestRBProfileGetHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("GET", "/v1/rb/definition/test-rbdef/v1/profile/"+testCase.prname, nil)
-                       resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil, nil))
 
                        //Check returned code
                        if resp.StatusCode != testCase.expectedCode {
@@ -288,7 +288,7 @@ func TestRBProfileListHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("GET", "/v1/rb/definition/"+testCase.def+"/"+testCase.version+"/profile", nil)
-                       resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil, nil))
 
                        //Check returned code
                        if resp.StatusCode != testCase.expectedCode {
@@ -347,7 +347,7 @@ func TestRBProfileDeleteHandler(t *testing.T) {
        for _, testCase := range testCases {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("DELETE", "/v1/rb/definition/test-rbdef/v1/profile/"+testCase.prname, nil)
-                       resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil, nil))
 
                        //Check returned code
                        if resp.StatusCode != testCase.expectedCode {
@@ -400,7 +400,7 @@ func TestRBProfileUploadHandler(t *testing.T) {
                t.Run(testCase.label, func(t *testing.T) {
                        request := httptest.NewRequest("POST",
                                "/v1/rb/definition/test-rbdef/v1/profile/"+testCase.prname+"/content", testCase.body)
-                       resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil))
+                       resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil, nil))
 
                        //Check returned code
                        if resp.StatusCode != testCase.expectedCode {
diff --git a/src/k8splugin/api/statussubhandler.go b/src/k8splugin/api/statussubhandler.go
new file mode 100644 (file)
index 0000000..c5c8de2
--- /dev/null
@@ -0,0 +1,229 @@
+/*
+Copyright © 2022 Orange
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package api
+
+import (
+       "encoding/json"
+       "io"
+       "net/http"
+
+       "github.com/onap/multicloud-k8s/src/k8splugin/internal/app"
+       log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
+
+       "github.com/gorilla/mux"
+)
+
+// Used to store the backend implementation objects
+// Also simplifies the mocking needed for unit testing
+type instanceStatusSubHandler struct {
+       // Interface that implements Status Subscription operations
+       client app.InstanceStatusSubManager
+}
+
+func (iss instanceStatusSubHandler) createHandler(w http.ResponseWriter, r *http.Request) {
+       vars := mux.Vars(r)
+       id := vars["instID"]
+
+       var subRequest app.SubscriptionRequest
+
+       err := json.NewDecoder(r.Body).Decode(&subRequest)
+       switch {
+       case err == io.EOF:
+               log.Error("Body Empty", log.Fields{
+                       "error": io.EOF,
+               })
+               http.Error(w, "Body empty", http.StatusBadRequest)
+               return
+       case err != nil:
+               log.Error("Error unmarshaling Body", log.Fields{
+                       "error": err,
+               })
+               http.Error(w, err.Error(), http.StatusUnprocessableEntity)
+               return
+       }
+
+       // Name is required.
+       if subRequest.Name == "" {
+               http.Error(w, "Missing name in POST request", http.StatusBadRequest)
+               return
+       }
+
+       // MinNotifyInterval cannot be less than 0
+       if subRequest.MinNotifyInterval < 0 {
+               http.Error(w, "Min Notify Interval has invalid value", http.StatusBadRequest)
+               return
+       }
+
+       // CallbackUrl is required
+       if subRequest.CallbackUrl == "" {
+               http.Error(w, "CallbackUrl has invalid value", http.StatusBadRequest)
+               return
+       }
+
+       resp, err := iss.client.Create(id, subRequest)
+       if err != nil {
+               log.Error("Error creating subscription", log.Fields{
+                       "error":    err,
+                       "instance": id,
+                       "response": resp,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusCreated)
+       err = json.NewEncoder(w).Encode(resp)
+       if err != nil {
+               log.Error("Error Marshaling Response", log.Fields{
+                       "error":    err,
+                       "response": resp,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+}
+
+func (iss instanceStatusSubHandler) getHandler(w http.ResponseWriter, r *http.Request) {
+       vars := mux.Vars(r)
+       instanceId := vars["instID"]
+       subId := vars["subID"]
+
+       resp, err := iss.client.Get(instanceId, subId)
+       if err != nil {
+               log.Error("Error getting instance's Status Subscription", log.Fields{
+                       "error":          err,
+                       "instanceID":     instanceId,
+                       "subscriptionID": subId,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       err = json.NewEncoder(w).Encode(resp)
+       if err != nil {
+               log.Error("Error Marshaling Response", log.Fields{
+                       "error":    err,
+                       "response": resp,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+}
+
+func (iss instanceStatusSubHandler) updateHandler(w http.ResponseWriter, r *http.Request) {
+       vars := mux.Vars(r)
+       instanceId := vars["instID"]
+       subId := vars["subID"]
+
+       var subRequest app.SubscriptionRequest
+
+       err := json.NewDecoder(r.Body).Decode(&subRequest)
+       switch {
+       case err == io.EOF:
+               log.Error("Body Empty", log.Fields{
+                       "error": io.EOF,
+               })
+               http.Error(w, "Body empty", http.StatusBadRequest)
+               return
+       case err != nil:
+               log.Error("Error unmarshaling Body", log.Fields{
+                       "error": err,
+               })
+               http.Error(w, err.Error(), http.StatusUnprocessableEntity)
+               return
+       }
+
+       // MinNotifyInterval cannot be less than 0
+       if subRequest.MinNotifyInterval < 0 {
+               http.Error(w, "Min Notify Interval has invalid value", http.StatusBadRequest)
+               return
+       }
+
+       // CallbackUrl is required
+       if subRequest.CallbackUrl == "" {
+               http.Error(w, "CallbackUrl has invalid value", http.StatusBadRequest)
+               return
+       }
+
+       resp, err := iss.client.Update(instanceId, subId, subRequest)
+       if err != nil {
+               log.Error("Error updating instance's Status Subscription", log.Fields{
+                       "error":          err,
+                       "instanceID":     instanceId,
+                       "subscriptionID": subId,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       err = json.NewEncoder(w).Encode(resp)
+       if err != nil {
+               log.Error("Error Marshaling Response", log.Fields{
+                       "error":    err,
+                       "response": resp,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+}
+
+func (iss instanceStatusSubHandler) deleteHandler(w http.ResponseWriter, r *http.Request) {
+       vars := mux.Vars(r)
+       instanceId := vars["instID"]
+       subId := vars["subID"]
+
+       err := iss.client.Delete(instanceId, subId)
+       if err != nil {
+               log.Error("Error deleting instance's Status Subscription", log.Fields{
+                       "error":          err,
+                       "instanceID":     instanceId,
+                       "subscriptionID": subId,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+       w.WriteHeader(http.StatusAccepted)
+}
+
+func (iss instanceStatusSubHandler) listHandler(w http.ResponseWriter, r *http.Request) {
+       vars := mux.Vars(r)
+       id := vars["instID"]
+
+       resp, err := iss.client.List(id)
+       if err != nil {
+               log.Error("Error listing instance Status Subscriptions", log.Fields{
+                       "error":       err,
+                       "instance-id": id,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       err = json.NewEncoder(w).Encode(resp)
+       if err != nil {
+               log.Error("Error Marshaling Response", log.Fields{
+                       "error":    err,
+                       "response": resp,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+}
index ff00613..23147b5 100644 (file)
@@ -16,7 +16,6 @@ package main
 
 import (
        "context"
-       "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
        "log"
        "math/rand"
        "net/http"
@@ -27,6 +26,7 @@ import (
        "github.com/onap/multicloud-k8s/src/k8splugin/api"
        "github.com/onap/multicloud-k8s/src/k8splugin/internal/auth"
        "github.com/onap/multicloud-k8s/src/k8splugin/internal/config"
+       "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
 
        "github.com/gorilla/handlers"
 )
@@ -40,7 +40,7 @@ func main() {
 
        rand.Seed(time.Now().UnixNano())
 
-       httpRouter := api.NewRouter(nil, nil, nil, nil, nil, nil, nil, nil)
+       httpRouter := api.NewRouter(nil, nil, nil, nil, nil, nil, nil, nil, nil)
        loggedRouter := handlers.LoggingHandler(os.Stdout, httpRouter)
        log.Println("Starting Kubernetes Multicloud API")
 
index 3aabda2..cbd3dd5 100644 (file)
@@ -28,6 +28,7 @@ import (
        //appsv1beta2 "k8s.io/api/apps/v1beta2"
        batchv1 "k8s.io/api/batch/v1"
        corev1 "k8s.io/api/core/v1"
+       v1 "k8s.io/api/core/v1"
 
        //extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
        //apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@@ -53,9 +54,11 @@ import (
        "k8s.io/apimachinery/pkg/runtime/schema"
        "k8s.io/client-go/discovery/cached/disk"
        "k8s.io/client-go/dynamic"
+       "k8s.io/client-go/dynamic/dynamicinformer"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/rest"
        "k8s.io/client-go/restmapper"
+       "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/tools/clientcmd"
 )
 
@@ -640,3 +643,17 @@ func (k *KubernetesClient) GetStandardClient() kubernetes.Interface {
 func (k *KubernetesClient) GetInstanceID() string {
        return k.instanceID
 }
+
+func (k *KubernetesClient) GetInformer(gvk schema.GroupVersionKind) (cache.SharedInformer, error) {
+       labelOptions := dynamicinformer.TweakListOptionsFunc(func(opts *metav1.ListOptions) {
+               opts.LabelSelector = config.GetConfiguration().KubernetesLabelName + "=" + k.instanceID
+       })
+
+       factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k.GetDynamicClient(), 0, v1.NamespaceAll, labelOptions)
+       mapping, err := k.GetMapper().RESTMapping(gvk.GroupKind(), gvk.Version)
+       if err != nil {
+               return nil, pkgerrors.Wrap(err, "Preparing mapper based on GVK")
+       }
+       informer := factory.ForResource(mapping.Resource).Informer()
+       return informer, nil
+}
index e78eea7..91e2150 100644 (file)
@@ -121,7 +121,7 @@ type InstanceManager interface {
        Upgrade(id string, u UpgradeRequest) (InstanceResponse, error)
        Get(id string) (InstanceResponse, error)
        GetFull(id string) (InstanceDbData, error)
-       Status(id string) (InstanceStatus, error)
+       Status(id string, checkReady bool) (InstanceStatus, error)
        Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error)
        List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error)
        Find(rbName string, ver string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error)
@@ -813,7 +813,7 @@ func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (Insta
 }
 
 // Status returns the status for the instance
-func (v *InstanceClient) Status(id string) (InstanceStatus, error) {
+func (v *InstanceClient) Status(id string, checkReady bool) (InstanceStatus, error) {
        //Read the status from the DB
        key := InstanceKey{
                ID: id,
@@ -867,12 +867,14 @@ Main:
                        isReady = false
                } else {
                        generalStatus = append(generalStatus, status)
-                       ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status)
+                       if checkReady {
+                               ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status)
 
-                       if !ready || err != nil {
-                               isReady = false
-                               if err != nil {
-                                       cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
+                               if !ready || err != nil {
+                                       isReady = false
+                                       if err != nil {
+                                               cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
+                                       }
                                }
                        }
                }
@@ -905,7 +907,7 @@ Main:
        resp := InstanceStatus{
                Request:         resResp.Request,
                ResourceCount:   int32(len(generalStatus)),
-               Ready:           isReady && resResp.Status == "DONE",
+               Ready:           checkReady && isReady && resResp.Status == "DONE",
                ResourcesStatus: generalStatus,
        }
 
@@ -914,7 +916,6 @@ Main:
                        strings.Join(cumulatedErrorMsg, "\n"))
                return resp, err
        }
-       //TODO Filter response content by requested verbosity (brief, ...)?
 
        return resp, nil
 }
@@ -925,7 +926,6 @@ func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient K
        defer cancel()
 
        apiVersion, kind := rss.GVK.ToAPIVersionAndKind()
-       log.Printf("apiVersion: %s, Kind: %s", apiVersion, kind)
 
        var parsedRes runtime.Object
        //TODO: Should we care about different api version for a same kind?
@@ -1143,6 +1143,12 @@ func (v *InstanceClient) Delete(id string) error {
                        }
                }()
        } else {
+               subscriptionClient := NewInstanceStatusSubClient()
+               err = subscriptionClient.Cleanup(id)
+               if err != nil {
+                       log.Printf(err.Error())
+               }
+
                err = db.DBconn.Delete(v.storeName, key, v.tagInst)
                if err != nil {
                        return pkgerrors.Wrap(err, "Delete Instance")
@@ -1288,6 +1294,11 @@ func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *H
                return pkgerrors.Wrap(err, "Error running post-delete hooks")
        }
        if clearDb {
+               subscriptionClient := NewInstanceStatusSubClient()
+               err = subscriptionClient.Cleanup(instance.ID)
+               if err != nil {
+                       log.Printf(err.Error())
+               }
                err = db.DBconn.Delete(v.storeName, key, v.tagInst)
                if err != nil {
                        log.Printf("Delete Instance DB Entry for release %s has error.", instance.ReleaseName)
diff --git a/src/k8splugin/internal/app/subscription.go b/src/k8splugin/internal/app/subscription.go
new file mode 100644 (file)
index 0000000..9b4a1aa
--- /dev/null
@@ -0,0 +1,752 @@
+/*
+ * Copyright © 2022 Orange
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package app
+
+import (
+       "bytes"
+       "encoding/json"
+       "io/ioutil"
+       "net"
+       "net/http"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
+       log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
+       "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb"
+       pkgerrors "github.com/pkg/errors"
+       "k8s.io/apimachinery/pkg/runtime/schema"
+       "k8s.io/client-go/tools/cache"
+)
+
+// QueryStatus is what is returned when status is queried for an instance
+type StatusSubscription struct {
+       Name              string                 `json:"name"`
+       MinNotifyInterval int32                  `json:"min-notify-interval"`
+       LastUpdateTime    time.Time              `json:"last-update-time"`
+       CallbackUrl       string                 `json:"callback-url"`
+       LastNotifyTime    time.Time              `json:"last-notify-time"`
+       LastNotifyStatus  int32                  `json:"last-notify-status"`
+       NotifyMetadata    map[string]interface{} `json:"metadata"`
+}
+
+type SubscriptionRequest struct {
+       Name              string                 `json:"name"`
+       MinNotifyInterval int32                  `json:"min-notify-interval"`
+       NotifyMetadata    map[string]interface{} `json:"metadata"`
+       CallbackUrl       string                 `json:"callback-url"`
+}
+
+// StatusSubscriptionKey is used as the primary key in the db
+type StatusSubscriptionKey struct {
+       InstanceId       string `json:"instanceid"`
+       SubscriptionName string `json:"subscriptionname"`
+}
+
+// We will use json marshalling to convert to string to
+// preserve the underlying structure.
+func (dk StatusSubscriptionKey) String() string {
+       out, err := json.Marshal(dk)
+       if err != nil {
+               return ""
+       }
+
+       return string(out)
+}
+
+// InstanceStatusSubClient implements InstanceStatusSubManager
+type InstanceStatusSubClient struct {
+       storeName string
+       tagInst   string
+}
+
+func NewInstanceStatusSubClient() *InstanceStatusSubClient {
+       return &InstanceStatusSubClient{
+               storeName: "rbdef",
+               tagInst:   "instanceStatusSub",
+       }
+}
+
+type notifyResult struct {
+       result int32
+       time   time.Time
+}
+
+type resourceStatusDelta struct {
+       Created  []ResourceStatus `json:"created"`
+       Deleted  []ResourceStatus `json:"deleted"`
+       Modified []ResourceStatus `json:"modified"`
+}
+
+type notifyRequestPayload struct {
+       InstanceId   string                 `json:"instance-id"`
+       Subscription string                 `json:"subscription-name"`
+       Metadata     map[string]interface{} `json:"metadata"`
+       Delta        resourceStatusDelta    `json:"resource-changes"`
+}
+
+func (rsd resourceStatusDelta) Delta() bool {
+       return len(rsd.Created) > 0 || len(rsd.Deleted) > 0 || len(rsd.Modified) > 0
+}
+
+type notifyChannelData struct {
+       instanceId   string
+       subscription StatusSubscription
+       action       string
+       delta        resourceStatusDelta
+       notifyResult chan notifyResult
+}
+
+type subscriptionWatch struct {
+       watcherStop    chan struct{}
+       lastUpdateTime time.Time
+}
+
+type subscriptionWatchManager struct {
+       watchersStatus map[string]subscriptionWatch
+}
+
+type subscriptionNotifyManager struct {
+       notifyLockMap  map[string]*sync.Mutex
+       notifyChannel  map[string]chan notifyChannelData
+       watchersStatus map[string]subscriptionWatchManager
+       sync.Mutex
+}
+
+var subscriptionNotifyData = subscriptionNotifyManager{
+       notifyLockMap:  map[string]*sync.Mutex{},
+       notifyChannel:  map[string]chan notifyChannelData{},
+       watchersStatus: map[string]subscriptionWatchManager{},
+}
+
+// InstanceStatusSubManager is an interface exposes the status subscription functionality
+type InstanceStatusSubManager interface {
+       Create(instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error)
+       Get(instanceId, subId string) (StatusSubscription, error)
+       Update(instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error)
+       List(instanceId string) ([]StatusSubscription, error)
+       Delete(instanceId, subId string) error
+       Cleanup(instanceId string) error
+       RestoreWatchers()
+}
+
+// Create Status Subscription
+func (iss *InstanceStatusSubClient) Create(instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error) {
+
+       _, err := iss.Get(instanceId, subDetails.Name)
+       if err == nil {
+               return StatusSubscription{}, pkgerrors.New("Subscription already exists")
+       }
+
+       lock, _, _ := getSubscriptionData(instanceId)
+
+       key := StatusSubscriptionKey{
+               InstanceId:       instanceId,
+               SubscriptionName: subDetails.Name,
+       }
+
+       sub := StatusSubscription{
+               Name:              subDetails.Name,
+               MinNotifyInterval: subDetails.MinNotifyInterval,
+               LastNotifyStatus:  0,
+               CallbackUrl:       subDetails.CallbackUrl,
+               LastUpdateTime:    time.Now(),
+               LastNotifyTime:    time.Now(),
+               NotifyMetadata:    subDetails.NotifyMetadata,
+       }
+       if sub.NotifyMetadata == nil {
+               sub.NotifyMetadata = make(map[string]interface{})
+       }
+
+       err = iss.refreshWatchers(instanceId, subDetails.Name)
+       if err != nil {
+               return sub, pkgerrors.Wrap(err, "Creating Status Subscription DB Entry")
+       }
+
+       lock.Lock()
+       defer lock.Unlock()
+
+       err = db.DBconn.Create(iss.storeName, key, iss.tagInst, sub)
+       if err != nil {
+               return sub, pkgerrors.Wrap(err, "Creating Status Subscription DB Entry")
+       }
+       log.Info("Successfully created Status Subscription", log.Fields{
+               "InstanceId":       instanceId,
+               "SubscriptionName": subDetails.Name,
+       })
+
+       go runNotifyThread(instanceId, sub.Name)
+
+       return sub, nil
+}
+
+// Get Status subscription
+func (iss *InstanceStatusSubClient) Get(instanceId, subId string) (StatusSubscription, error) {
+       lock, _, _ := getSubscriptionData(instanceId)
+       // Acquire Mutex
+       lock.Lock()
+       defer lock.Unlock()
+       key := StatusSubscriptionKey{
+               InstanceId:       instanceId,
+               SubscriptionName: subId,
+       }
+       DBResp, err := db.DBconn.Read(iss.storeName, key, iss.tagInst)
+       if err != nil || DBResp == nil {
+               return StatusSubscription{}, pkgerrors.Wrap(err, "Error retrieving Subscription data")
+       }
+
+       sub := StatusSubscription{}
+       err = db.DBconn.Unmarshal(DBResp, &sub)
+       if err != nil {
+               return StatusSubscription{}, pkgerrors.Wrap(err, "Demarshalling Subscription Value")
+       }
+       return sub, nil
+}
+
+// Update status subscription
+func (iss *InstanceStatusSubClient) Update(instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error) {
+       sub, err := iss.Get(instanceId, subDetails.Name)
+       if err != nil {
+               return StatusSubscription{}, pkgerrors.Wrap(err, "Subscription does not exist")
+       }
+
+       lock, _, _ := getSubscriptionData(instanceId)
+
+       key := StatusSubscriptionKey{
+               InstanceId:       instanceId,
+               SubscriptionName: subDetails.Name,
+       }
+
+       sub.MinNotifyInterval = subDetails.MinNotifyInterval
+       sub.CallbackUrl = subDetails.CallbackUrl
+       sub.NotifyMetadata = subDetails.NotifyMetadata
+       if sub.NotifyMetadata == nil {
+               sub.NotifyMetadata = make(map[string]interface{})
+       }
+
+       err = iss.refreshWatchers(instanceId, subDetails.Name)
+       if err != nil {
+               return sub, pkgerrors.Wrap(err, "Updating Status Subscription DB Entry")
+       }
+
+       lock.Lock()
+       defer lock.Unlock()
+
+       err = db.DBconn.Update(iss.storeName, key, iss.tagInst, sub)
+       if err != nil {
+               return sub, pkgerrors.Wrap(err, "Updating Status Subscription DB Entry")
+       }
+       log.Info("Successfully updated Status Subscription", log.Fields{
+               "InstanceId":       instanceId,
+               "SubscriptionName": subDetails.Name,
+       })
+
+       return sub, nil
+}
+
+// Get list of status subscriptions
+func (iss *InstanceStatusSubClient) List(instanceId string) ([]StatusSubscription, error) {
+
+       lock, _, _ := getSubscriptionData(instanceId)
+       // Acquire Mutex
+       lock.Lock()
+       defer lock.Unlock()
+       // Retrieve info about created status subscriptions
+       dbResp, err := db.DBconn.ReadAll(iss.storeName, iss.tagInst)
+       if err != nil {
+               if !strings.Contains(err.Error(), "Did not find any objects with tag") {
+                       return []StatusSubscription{}, pkgerrors.Wrap(err, "Getting Status Subscription data")
+               }
+       }
+       subList := make([]StatusSubscription, 0)
+       for key, value := range dbResp {
+               if key != "" {
+                       subKey := StatusSubscriptionKey{}
+                       err = json.Unmarshal([]byte(key), &subKey)
+                       if err != nil {
+                               log.Error("Error demarshaling Status Subscription Key DB data", log.Fields{
+                                       "error": err.Error(),
+                                       "key":   key})
+                               return []StatusSubscription{}, pkgerrors.Wrap(err, "Demarshalling subscription key")
+                       }
+                       if subKey.InstanceId != instanceId {
+                               continue
+                       }
+               }
+               //value is a byte array
+               if value != nil {
+                       sub := StatusSubscription{}
+                       err = db.DBconn.Unmarshal(value, &sub)
+                       if err != nil {
+                               log.Error("Error demarshaling Status Subscription DB data", log.Fields{
+                                       "error": err.Error(),
+                                       "key":   key})
+                       }
+                       subList = append(subList, sub)
+               }
+       }
+
+       return subList, nil
+}
+
+// Delete status subscription
+func (iss *InstanceStatusSubClient) Delete(instanceId, subId string) error {
+       _, err := iss.Get(instanceId, subId)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Subscription does not exist")
+       }
+       lock, _, watchers := getSubscriptionData(instanceId)
+       // Acquire Mutex
+       lock.Lock()
+       defer lock.Unlock()
+
+       close(watchers.watchersStatus[subId].watcherStop)
+       delete(watchers.watchersStatus, subId)
+
+       key := StatusSubscriptionKey{
+               InstanceId:       instanceId,
+               SubscriptionName: subId,
+       }
+       err = db.DBconn.Delete(iss.storeName, key, iss.tagInst)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Removing Status Subscription in DB")
+       }
+       return nil
+}
+
+// Cleanup status subscriptions for instance
+func (iss *InstanceStatusSubClient) Cleanup(instanceId string) error {
+       subList, err := iss.List(instanceId)
+       if err != nil {
+               return err
+       }
+
+       for _, sub := range subList {
+               err = iss.Delete(instanceId, sub.Name)
+               if err != nil {
+                       log.Error("Error deleting ", log.Fields{
+                               "error": err.Error(),
+                               "key":   sub.Name})
+               }
+       }
+       removeSubscriptionData(instanceId)
+       return err
+}
+
+// Restore status subscriptions notify threads
+func (iss *InstanceStatusSubClient) RestoreWatchers() {
+       go func() {
+               time.Sleep(time.Second * 10)
+               log.Info("Restoring status subscription notifications", log.Fields{})
+               v := NewInstanceClient()
+               instances, err := v.List("", "", "")
+               if err != nil {
+                       log.Error("Error reading instance list", log.Fields{
+                               "error": err.Error(),
+                       })
+               }
+               for _, instance := range instances {
+                       subList, err := iss.List(instance.ID)
+                       if err != nil {
+                               log.Error("Error reading subscription list for instance", log.Fields{
+                                       "error":    err.Error(),
+                                       "instance": instance.ID,
+                               })
+                               continue
+                       }
+
+                       for _, sub := range subList {
+                               err = iss.refreshWatchers(instance.ID, sub.Name)
+                               if err != nil {
+                                       log.Error("Error on refreshing watchers", log.Fields{
+                                               "error":        err.Error(),
+                                               "instance":     instance.ID,
+                                               "subscription": sub.Name,
+                                       })
+                                       continue
+                               }
+                               go runNotifyThread(instance.ID, sub.Name)
+                       }
+               }
+       }()
+}
+
+func (iss *InstanceStatusSubClient) refreshWatchers(instanceId, subId string) error {
+       log.Info("REFRESH WATCHERS", log.Fields{
+               "instance":     instanceId,
+               "subscription": subId,
+       })
+       v := NewInstanceClient()
+       k8sClient := KubernetesClient{}
+       instance, err := v.Get(instanceId)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Cannot get instance for notify thread")
+       }
+       profile, err := rb.NewProfileClient().Get(instance.Request.RBName, instance.Request.RBVersion,
+               instance.Request.ProfileName)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Unable to find Profile instance status")
+       }
+       err = k8sClient.Init(instance.Request.CloudRegion, instanceId)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Cannot set k8s client for instance")
+       }
+
+       lock, _, watchers := getSubscriptionData(instanceId)
+       // Acquire Mutex
+       lock.Lock()
+       defer lock.Unlock()
+       watcher, ok := watchers.watchersStatus[subId]
+       if ok {
+               close(watcher.watcherStop)
+       } else {
+               watchers.watchersStatus[subId] = subscriptionWatch{
+                       lastUpdateTime: time.Now(),
+               }
+       }
+
+       watcher.watcherStop = make(chan struct{})
+
+       for _, gvk := range gvkListForInstance(instance, profile) {
+               informer, _ := k8sClient.GetInformer(gvk)
+               handlers := cache.ResourceEventHandlerFuncs{
+                       AddFunc: func(obj interface{}) {
+                               lock.Lock()
+                               watcher.lastUpdateTime = time.Now()
+                               watchers.watchersStatus[subId] = watcher
+                               lock.Unlock()
+                       },
+                       UpdateFunc: func(oldObj, obj interface{}) {
+                               lock.Lock()
+                               watcher.lastUpdateTime = time.Now()
+                               watchers.watchersStatus[subId] = watcher
+                               lock.Unlock()
+                       },
+                       DeleteFunc: func(obj interface{}) {
+                               lock.Lock()
+                               watcher.lastUpdateTime = time.Now()
+                               watchers.watchersStatus[subId] = watcher
+                               lock.Unlock()
+                       },
+               }
+               informer.AddEventHandler(handlers)
+               go func(informer cache.SharedInformer, stopper chan struct{}, fields log.Fields) {
+                       log.Info("[START] Watcher", fields)
+                       informer.Run(stopper)
+                       log.Info("[STOP] Watcher", fields)
+               }(informer, watcher.watcherStop, log.Fields{
+                       "Kind":         gvk.Kind,
+                       "Instance":     instanceId,
+                       "Subscription": subId,
+               })
+       }
+       return nil
+}
+
+// Get the Mutex for the Subscription
+func getSubscriptionData(instanceId string) (*sync.Mutex, chan notifyChannelData, subscriptionWatchManager) {
+       var key string = instanceId
+       subscriptionNotifyData.Lock()
+       defer subscriptionNotifyData.Unlock()
+       _, ok := subscriptionNotifyData.notifyLockMap[key]
+       if !ok {
+               subscriptionNotifyData.notifyLockMap[key] = &sync.Mutex{}
+       }
+       _, ok = subscriptionNotifyData.notifyChannel[key]
+       if !ok {
+               subscriptionNotifyData.notifyChannel[key] = make(chan notifyChannelData)
+               go scheduleNotifications(instanceId, subscriptionNotifyData.notifyChannel[key])
+               time.Sleep(time.Second * 5)
+       }
+       _, ok = subscriptionNotifyData.watchersStatus[key]
+       if !ok {
+               subscriptionNotifyData.watchersStatus[key] = subscriptionWatchManager{
+                       watchersStatus: make(map[string]subscriptionWatch),
+               }
+       }
+       return subscriptionNotifyData.notifyLockMap[key], subscriptionNotifyData.notifyChannel[key], subscriptionNotifyData.watchersStatus[key]
+}
+
+func removeSubscriptionData(instanceId string) {
+       var key string = instanceId
+       subscriptionNotifyData.Lock()
+       defer subscriptionNotifyData.Unlock()
+       _, ok := subscriptionNotifyData.notifyLockMap[key]
+       if ok {
+               delete(subscriptionNotifyData.notifyLockMap, key)
+       }
+       _, ok = subscriptionNotifyData.notifyChannel[key]
+       if ok {
+               crl := notifyChannelData{
+                       instanceId: instanceId,
+                       action:     "STOP",
+               }
+               subscriptionNotifyData.notifyChannel[key] <- crl
+               delete(subscriptionNotifyData.notifyChannel, key)
+       }
+       _, ok = subscriptionNotifyData.watchersStatus[key]
+       if !ok {
+               delete(subscriptionNotifyData.watchersStatus, key)
+       }
+}
+
+// notify request timeout
+func notifyTimeout(network, addr string) (net.Conn, error) {
+       return net.DialTimeout(network, addr, time.Duration(time.Second*5))
+}
+
+// Per Subscription Go routine to send notification about status change
+func scheduleNotifications(instanceId string, c chan notifyChannelData) {
+       // Keep thread running
+       log.Info("[START] status notify thread for ", log.Fields{
+               "instance": instanceId,
+       })
+       for {
+               data := <-c
+               breakThread := false
+               switch {
+               case data.action == "NOTIFY":
+                       var result = notifyResult{}
+                       var err error = nil
+                       var notifyPayload = notifyRequestPayload{
+                               Delta:        data.delta,
+                               InstanceId:   data.instanceId,
+                               Subscription: data.subscription.Name,
+                               Metadata:     data.subscription.NotifyMetadata,
+                       }
+                       notifyBody, err := json.Marshal(notifyPayload)
+                       if err == nil {
+                               notifyBodyBuffer := bytes.NewBuffer(notifyBody)
+                               transport := http.Transport{
+                                       Dial: notifyTimeout,
+                               }
+                               client := http.Client{
+                                       Transport: &transport,
+                               }
+                               resp, errReq := client.Post(data.subscription.CallbackUrl, "application/json", notifyBodyBuffer)
+                               if errReq == nil {
+                                       result.result = int32(resp.StatusCode)
+                                       if resp.StatusCode >= 400 {
+                                               respBody, _ := ioutil.ReadAll(resp.Body)
+                                               log.Error("Status notification request failed", log.Fields{
+                                                       "instance": instanceId,
+                                                       "name":     data.subscription.Name,
+                                                       "url":      data.subscription.CallbackUrl,
+                                                       "code":     resp.StatusCode,
+                                                       "status":   resp.Status,
+                                                       "body":     string(respBody),
+                                               })
+                                               resp.Body.Close()
+                                       }
+                               } else {
+                                       err = errReq
+                               }
+                       }
+
+                       if err != nil {
+                               log.Error("Error for status notify thread", log.Fields{
+                                       "instance": instanceId,
+                                       "name":     data.subscription.Name,
+                                       "err":      err.Error(),
+                               })
+                               result.result = 500
+                       }
+                       result.time = time.Now()
+
+                       data.notifyResult <- result
+
+               case data.action == "STOP":
+                       breakThread = true
+               }
+               if breakThread {
+                       break
+               }
+       }
+       log.Info("[STOP] status notify thread for ", log.Fields{
+               "instance": instanceId,
+       })
+}
+
+func gvkListForInstance(instance InstanceResponse, profile rb.Profile) []schema.GroupVersionKind {
+       list := make([]schema.GroupVersionKind, 0)
+       gvkMap := make(map[string]schema.GroupVersionKind)
+       gvk := schema.FromAPIVersionAndKind("v1", "Pod")
+       gvkMap[gvk.String()] = gvk
+       for _, res := range instance.Resources {
+               gvk = res.GVK
+               _, ok := gvkMap[gvk.String()]
+               if !ok {
+                       gvkMap[gvk.String()] = gvk
+               }
+       }
+       for _, gvk := range profile.ExtraResourceTypes {
+               _, ok := gvkMap[gvk.String()]
+               if !ok {
+                       gvkMap[gvk.String()] = gvk
+               }
+       }
+       for _, gvk := range gvkMap {
+               list = append(list, gvk)
+       }
+       return list
+}
+
+func runNotifyThread(instanceId, subName string) {
+       v := NewInstanceClient()
+       iss := NewInstanceStatusSubClient()
+       var status = InstanceStatus{
+               ResourceCount: -1,
+       }
+       key := StatusSubscriptionKey{
+               InstanceId:       instanceId,
+               SubscriptionName: subName,
+       }
+       time.Sleep(time.Second * 5)
+       log.Info("[START] status verification thread", log.Fields{
+               "InstanceId":       instanceId,
+               "SubscriptionName": subName,
+       })
+
+       lastChange := time.Now()
+       var timeInSeconds time.Duration = 5
+       for {
+               time.Sleep(time.Second * timeInSeconds)
+
+               lock, subData, watchers := getSubscriptionData(instanceId)
+               var changeDetected = false
+               lock.Lock()
+               watcherStatus, ok := watchers.watchersStatus[subName]
+               if ok {
+                       changeDetected = watcherStatus.lastUpdateTime.After(lastChange)
+               }
+               lock.Unlock()
+               if !ok {
+                       break
+               }
+               if changeDetected || status.ResourceCount < 0 {
+                       currentSub, err := iss.Get(instanceId, subName)
+                       if err != nil {
+                               log.Error("Error getting current status", log.Fields{
+                                       "error":    err.Error(),
+                                       "instance": instanceId})
+                               break
+                       }
+                       if currentSub.MinNotifyInterval > 5 {
+                               timeInSeconds = time.Duration(currentSub.MinNotifyInterval)
+                       } else {
+                               timeInSeconds = 5
+                       }
+                       newStatus, err := v.Status(instanceId, false)
+                       if err != nil {
+                               log.Error("Error getting current status", log.Fields{
+                                       "error":    err.Error(),
+                                       "instance": instanceId})
+                               break
+                       } else {
+                               if status.ResourceCount >= 0 {
+                                       var delta = statusDelta(status, newStatus)
+                                       if delta.Delta() {
+                                               log.Info("CHANGE DETECTED", log.Fields{
+                                                       "Instance":     instanceId,
+                                                       "Subscription": subName,
+                                               })
+                                               lastChange = watcherStatus.lastUpdateTime
+                                               for _, res := range delta.Created {
+                                                       log.Info("CREATED", log.Fields{
+                                                               "Kind": res.GVK.Kind,
+                                                               "Name": res.Name,
+                                                       })
+                                               }
+                                               for _, res := range delta.Modified {
+                                                       log.Info("MODIFIED", log.Fields{
+                                                               "Kind": res.GVK.Kind,
+                                                               "Name": res.Name,
+                                                       })
+                                               }
+                                               for _, res := range delta.Deleted {
+                                                       log.Info("DELETED", log.Fields{
+                                                               "Kind": res.GVK.Kind,
+                                                               "Name": res.Name,
+                                                       })
+                                               }
+                                               // Acquire Mutex
+                                               lock.Lock()
+                                               currentSub.LastUpdateTime = time.Now()
+                                               var notifyResultCh = make(chan notifyResult)
+                                               var newData = notifyChannelData{
+                                                       instanceId:   instanceId,
+                                                       subscription: currentSub,
+                                                       action:       "NOTIFY",
+                                                       delta:        delta,
+                                                       notifyResult: notifyResultCh,
+                                               }
+                                               subData <- newData
+                                               var notifyResult notifyResult = <-notifyResultCh
+                                               log.Info("Notification sent", log.Fields{
+                                                       "InstanceId":       instanceId,
+                                                       "SubscriptionName": subName,
+                                                       "Result":           notifyResult.result,
+                                               })
+                                               currentSub.LastNotifyStatus = notifyResult.result
+                                               currentSub.LastNotifyTime = notifyResult.time
+                                               err = db.DBconn.Update(iss.storeName, key, iss.tagInst, currentSub)
+                                               if err != nil {
+                                                       log.Error("Error updating subscription status", log.Fields{
+                                                               "error":    err.Error(),
+                                                               "instance": instanceId})
+                                               }
+                                               lock.Unlock()
+                                       }
+                               }
+
+                               status = newStatus
+                       }
+               }
+       }
+       log.Info("[STOP] status verification thread", log.Fields{
+               "InstanceId":       instanceId,
+               "SubscriptionName": subName,
+       })
+}
+
+func statusDelta(first, second InstanceStatus) resourceStatusDelta {
+       var delta resourceStatusDelta = resourceStatusDelta{
+               Created:  make([]ResourceStatus, 0),
+               Deleted:  make([]ResourceStatus, 0),
+               Modified: make([]ResourceStatus, 0),
+       }
+       var firstResList map[string]ResourceStatus = make(map[string]ResourceStatus)
+       for _, res := range first.ResourcesStatus {
+               firstResList[res.Key()] = res
+       }
+       for _, res := range second.ResourcesStatus {
+               var key string = res.Key()
+               if prevRes, ok := firstResList[key]; ok {
+                       if prevRes.Value() != res.Value() {
+                               delta.Modified = append(delta.Modified, res)
+                       }
+                       delete(firstResList, res.Key())
+               } else {
+                       delta.Created = append(delta.Created, res)
+               }
+       }
+       for _, res := range firstResList {
+               delta.Deleted = append(delta.Deleted, res)
+       }
+       return delta
+}