Fix Healthcheck API 23/119223/4
authorKonrad Bańka <k.banka@samsung.com>
Fri, 12 Mar 2021 07:46:20 +0000 (08:46 +0100)
committerKonrad Bańka <k.banka@samsung.com>
Wed, 31 Mar 2021 16:34:14 +0000 (18:34 +0200)
Fix several issues related to Healthcheck creation.
Updated GET/DELETE methods to work properly.
This commit leaves few FIXME/TODOs that will be handled within Helm3
Rebase commit

Issue-ID: MULTICLOUD-1308
Signed-off-by: Konrad Bańka <k.banka@samsung.com>
Change-Id: I5da50363bb240fdc85d3624f43cb0526786da542

src/k8splugin/api/instancehchandler.go
src/k8splugin/go.mod
src/k8splugin/internal/healthcheck/healthcheck.go
src/k8splugin/internal/healthcheck/kubeclient.go
src/k8splugin/internal/healthcheck/stream.go [new file with mode: 0644]
src/k8splugin/internal/helm/helm.go

index fc1c3be..9071517 100644 (file)
@@ -46,7 +46,7 @@ func (ih instanceHCHandler) createHandler(w http.ResponseWriter, r *http.Request
        }
 
        w.Header().Set("Content-Type", "application/json")
-       w.WriteHeader(http.StatusOK)
+       w.WriteHeader(http.StatusCreated)
        err = json.NewEncoder(w).Encode(resp)
        if err != nil {
                log.Error("Error Marshaling Reponse", log.Fields{
@@ -59,10 +59,75 @@ func (ih instanceHCHandler) createHandler(w http.ResponseWriter, r *http.Request
 }
 
 func (ih instanceHCHandler) getHandler(w http.ResponseWriter, r *http.Request) {
+       vars := mux.Vars(r)
+       instanceId := vars["instID"]
+       healthcheckId := vars["hcID"]
+
+       resp, err := ih.client.Get(instanceId, healthcheckId)
+       if err != nil {
+               log.Error("Error getting Instance's healthcheck", log.Fields{
+                       "error":         err,
+                       "instanceID":    instanceId,
+                       "healthcheckID": healthcheckId,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       err = json.NewEncoder(w).Encode(resp)
+       if err != nil {
+               log.Error("Error Marshaling Response", log.Fields{
+                       "error":    err,
+                       "response": resp,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
 }
 
 func (ih instanceHCHandler) deleteHandler(w http.ResponseWriter, r *http.Request) {
+       vars := mux.Vars(r)
+       instanceId := vars["instID"]
+       healthcheckId := vars["hcID"]
+
+       err := ih.client.Delete(instanceId, healthcheckId)
+       if err != nil {
+               log.Error("Error deleting Instance's healthcheck", log.Fields{
+                       "error":         err,
+                       "instanceID":    instanceId,
+                       "healthcheckID": healthcheckId,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+       w.WriteHeader(http.StatusAccepted)
 }
 
 func (ih instanceHCHandler) listHandler(w http.ResponseWriter, r *http.Request) {
+       vars := mux.Vars(r)
+       id := vars["instID"]
+
+       resp, err := ih.client.List(id)
+       if err != nil {
+               log.Error("Error getting instance healthcheck overview", log.Fields{
+                       "error":       err,
+                       "instance-id": id,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       err = json.NewEncoder(w).Encode(resp)
+       if err != nil {
+               log.Error("Error Marshaling Response", log.Fields{
+                       "error":    err,
+                       "response": resp,
+               })
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
 }
index 5371bb5..dfb3259 100644 (file)
@@ -81,6 +81,7 @@ require (
        golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect
        golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect
        google.golang.org/appengine v1.5.0 // indirect
+       google.golang.org/grpc v1.19.0
        gopkg.in/gorp.v1 v1.7.2 // indirect
        gopkg.in/inf.v0 v0.9.1 // indirect
        gopkg.in/square/go-jose.v2 v2.2.2 // indirect
index 341b1db..cf1d39b 100644 (file)
@@ -15,6 +15,8 @@ package healthcheck
 
 import (
        "encoding/json"
+       "strings"
+       "time"
 
        protorelease "k8s.io/helm/pkg/proto/hapi/release"
        "k8s.io/helm/pkg/releasetesting"
@@ -27,22 +29,18 @@ import (
        pkgerrors "github.com/pkg/errors"
 )
 
-// HealthcheckState holds possible states of Healthcheck instance
-type HealthcheckState string
-
-const (
-       HcS_UNKNOWN HealthcheckState = "UNKNOWN"
-       HcS_STARTED HealthcheckState = "STARTED"
-       HcS_RUNNING HealthcheckState = "RUNNING"
-       HcS_SUCCESS HealthcheckState = "SUCCESS"
-       HcS_FAILURE HealthcheckState = "FAILURE"
+var (
+       HcS_UNKNOWN string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_UNKNOWN)]
+       HcS_RUNNING string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_RUNNING)]
+       HcS_SUCCESS string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_SUCCESS)]
+       HcS_FAILURE string = protorelease.TestRun_Status_name[int32(protorelease.TestRun_FAILURE)]
 )
 
 // InstanceHCManager interface exposes instance Healthcheck fuctionalities
 type InstanceHCManager interface {
-       Create(instanceId string) (InstanceHCStatus, error)
+       Create(instanceId string) (InstanceMiniHCStatus, error)
        Get(instanceId, healthcheckId string) (InstanceHCStatus, error)
-       List(instanceId string) ([]InstanceHCStatus, error)
+       List(instanceId string) (InstanceHCOverview, error)
        Delete(instanceId, healthcheckId string) error
 }
 
@@ -71,9 +69,24 @@ type InstanceHCClient struct {
 
 // InstanceHCStatus holds healthcheck status
 type InstanceHCStatus struct {
-       releasetesting.TestSuite
-       Id     string
-       Status HealthcheckState
+       InstanceId               string `json:"instance-id"`
+       HealthcheckId            string `json:"healthcheck-id"`
+       Status                   string `json:"status"`
+       Info                     string `json:"info"`
+       releasetesting.TestSuite `json:"test-suite"`
+}
+
+// InstanceMiniHCStatus holds only healthcheck summary
+type InstanceMiniHCStatus struct {
+       HealthcheckId string `json:"healthcheck-id"`
+       Status        string `json:"status"`
+}
+
+// InstanceHCOverview holds Healthcheck-related data
+type InstanceHCOverview struct {
+       InstanceId string                 `json:"instance-id"`
+       HCSummary  []InstanceMiniHCStatus `json:"healthcheck-summary"`
+       Hooks      []*protorelease.Hook   `json:"hooks"`
 }
 
 func NewHCClient() *InstanceHCClient {
@@ -83,7 +96,11 @@ func NewHCClient() *InstanceHCClient {
        }
 }
 
-func (ihc InstanceHCClient) Create(instanceId string) (InstanceHCStatus, error) {
+func instanceMiniHCStatusFromStatus(ihcs InstanceHCStatus) InstanceMiniHCStatus {
+       return InstanceMiniHCStatus{ihcs.HealthcheckId, ihcs.Status}
+}
+
+func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, error) {
        //Generate ID
        id := namegenerator.Generate()
 
@@ -91,67 +108,190 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceHCStatus, error)
        v := app.NewInstanceClient()
        instance, err := v.Get(instanceId)
        if err != nil {
-               return InstanceHCStatus{}, pkgerrors.Wrap(err, "Getting instance")
+               return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Getting instance")
        }
 
        //Prepare Environment, Request and Release structs
        //TODO In future could derive params from request
        client, err := NewKubeClient(instanceId, instance.Request.CloudRegion)
        if err != nil {
-               return InstanceHCStatus{}, pkgerrors.Wrap(err, "Preparing KubeClient")
+               return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Preparing KubeClient")
+       }
+       key := HealthcheckKey{
+               InstanceId:    instanceId,
+               HealthcheckId: id,
        }
        env := &releasetesting.Environment{
                Namespace:  instance.Namespace,
                KubeClient: client,
                Parallel:   false,
+               Stream:     NewStream(key, ihc.storeName, ihc.tagInst),
        }
        release := protorelease.Release{
                Name:  instance.ReleaseName,
                Hooks: instance.Hooks,
        }
 
-       //Run HC
+       //Define HC
        testSuite, err := releasetesting.NewTestSuite(&release)
        if err != nil {
                log.Error("Error creating TestSuite", log.Fields{
                        "Release": release,
                })
-               return InstanceHCStatus{}, pkgerrors.Wrap(err, "Creating TestSuite")
-       }
-       if err = testSuite.Run(env); err != nil {
-               log.Error("Error running TestSuite", log.Fields{
-                       "TestSuite":   testSuite,
-                       "Environment": env,
-               })
-               return InstanceHCStatus{}, pkgerrors.Wrap(err, "Running TestSuite")
+               return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Creating TestSuite")
        }
 
        //Save state
        ihcs := InstanceHCStatus{
-               TestSuite: *testSuite,
-               Id:        id,
-               Status:    HcS_STARTED,
-       }
-       key := HealthcheckKey{
-               InstanceId:    instance.ID,
+               TestSuite:     *testSuite,
                HealthcheckId: id,
+               InstanceId:    instanceId,
+               Status:        HcS_RUNNING,
        }
        err = db.DBconn.Create(ihc.storeName, key, ihc.tagInst, ihcs)
        if err != nil {
-               return ihcs, pkgerrors.Wrap(err, "Creating Instance DB Entry")
+               return instanceMiniHCStatusFromStatus(ihcs),
+                       pkgerrors.Wrap(err, "Creating Instance DB Entry")
        }
 
-       return ihcs, nil
+       // Run HC async
+       // If testSuite doesn't fail immediately, we let it continue in background
+       errC := make(chan error, 1)
+       timeout := make(chan bool, 1)
+       // Stream handles updates of testSuite run so we don't need to care
+       var RunAsync func() = func() {
+               err := ihcs.TestSuite.Run(env)
+               if err != nil {
+                       log.Error("Error running TestSuite", log.Fields{
+                               "TestSuite":   ihcs.TestSuite,
+                               "Environment": env,
+                               "Error":       err.Error(),
+                       })
+                       ihcs.Status = HcS_FAILURE
+                       ihcs.Info = ihcs.Info + "\n" + err.Error()
+               }
+               // Send to channel before db update as it can be slow
+               errC <- err
+               // Download latest Status/Info
+               resp, err := ihc.Get(ihcs.InstanceId, ihcs.HealthcheckId)
+               if err != nil {
+                       log.Error("Error querying Healthcheck status", log.Fields{"error": err})
+                       return
+               }
+               ihcs.Status = resp.Status
+               ihcs.Info = resp.Info
+               // Update DB
+               err = db.DBconn.Update(ihc.storeName, key, ihc.tagInst, ihcs)
+               if err != nil {
+                       log.Error("Error saving Testsuite failure in DB", log.Fields{
+                               "InstanceHCStatus": ihcs,
+                               "Error":            err,
+                       })
+               }
+       }
+       go func() {
+               //Time is hardcoded but the only impact is that typically http response is sent after 2s
+               //Could be considered shorter in future
+               time.Sleep(2 * time.Second)
+               timeout <- true
+       }()
+       go RunAsync()
+       select {
+       case err := <-errC:
+               if err == nil {
+                       return instanceMiniHCStatusFromStatus(ihcs), nil
+               } else {
+                       return instanceMiniHCStatusFromStatus(ihcs), err
+               }
+       case <-timeout:
+               return instanceMiniHCStatusFromStatus(ihcs), nil
+       }
 }
 
 func (ihc InstanceHCClient) Get(instanceId, healthcheckId string) (InstanceHCStatus, error) {
-       return InstanceHCStatus{}, nil
+       key := HealthcheckKey{
+               InstanceId:    instanceId,
+               HealthcheckId: healthcheckId,
+       }
+       DBResp, err := db.DBconn.Read(ihc.storeName, key, ihc.tagInst)
+       if err != nil || DBResp == nil {
+               return InstanceHCStatus{}, pkgerrors.Wrap(err, "Error retrieving Healthcheck data")
+       }
+
+       resp := InstanceHCStatus{}
+       err = db.DBconn.Unmarshal(DBResp, &resp)
+       if err != nil {
+               return InstanceHCStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value")
+       }
+       return resp, nil
 }
 
 func (ihc InstanceHCClient) Delete(instanceId, healthcheckId string) error {
+       v := app.NewInstanceClient()
+       instance, err := v.Get(instanceId)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Getting instance")
+       }
+       client, err := NewKubeClient(instanceId, instance.Request.CloudRegion)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Preparing KubeClient")
+       }
+       env := &releasetesting.Environment{
+               Namespace:  instance.Namespace,
+               KubeClient: client,
+       }
+       ihcs, err := ihc.Get(instanceId, healthcheckId)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Error querying Healthcheck status")
+       }
+       env.DeleteTestPods(ihcs.TestSuite.TestManifests)
+       key := HealthcheckKey{instanceId, healthcheckId}
+       err = db.DBconn.Delete(ihc.storeName, key, ihc.tagInst)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Removing Healthcheck in DB")
+       }
        return nil
 }
 
-func (ihc InstanceHCClient) List(instanceId string) ([]InstanceHCStatus, error) {
-       return make([]InstanceHCStatus, 0), nil
+func (ihc InstanceHCClient) List(instanceId string) (InstanceHCOverview, error) {
+       ihco := InstanceHCOverview{
+               InstanceId: instanceId,
+       }
+
+       // Retrieve info about available hooks
+       v := app.NewInstanceClient()
+       instance, err := v.Get(instanceId)
+       if err != nil {
+               return ihco, pkgerrors.Wrap(err, "Getting Instance data")
+       }
+       ihco.Hooks = instance.Hooks
+
+       // Retrieve info about running/completed healthchecks
+       dbResp, err := db.DBconn.ReadAll(ihc.storeName, ihc.tagInst)
+       if err != nil {
+               if !strings.Contains(err.Error(), "Did not find any objects with tag") {
+                       return ihco, pkgerrors.Wrap(err, "Getting Healthcheck data")
+               }
+       }
+       miniStatus := make([]InstanceMiniHCStatus, 0)
+       for key, value := range dbResp {
+               //value is a byte array
+               if value != nil {
+                       resp := InstanceHCStatus{}
+                       err = db.DBconn.Unmarshal(value, &resp)
+                       if err != nil {
+                               log.Error("Error unmarshaling Instance HC data", log.Fields{
+                                       "error": err.Error(),
+                                       "key":   key})
+                       }
+                       //Filter-out healthchecks from other instances
+                       if instanceId != resp.InstanceId {
+                               continue
+                       }
+                       miniStatus = append(miniStatus, instanceMiniHCStatusFromStatus(resp))
+               }
+       }
+       ihco.HCSummary = miniStatus
+
+       return ihco, nil
 }
index be4c6fc..2a168a7 100644 (file)
@@ -25,13 +25,15 @@ import (
 
 //implements environment.KubeClient but overrides it so that
 //custom labels can be injected into created resources
-// using internal k8sClient
+//using internal k8sClient
 type KubeClientImpl struct {
        environment.KubeClient
        labels map[string]string
        k      app.KubernetesClient
 }
 
+var _ environment.KubeClient = KubeClientImpl{}
+
 func NewKubeClient(instanceId, cloudRegion string) (*KubeClientImpl, error) {
        k8sClient := app.KubernetesClient{}
        err := k8sClient.Init(cloudRegion, instanceId)
diff --git a/src/k8splugin/internal/healthcheck/stream.go b/src/k8splugin/internal/healthcheck/stream.go
new file mode 100644 (file)
index 0000000..d7c6e65
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+Copyright © 2021 Samsung Electronics
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package healthcheck
+
+import (
+       "google.golang.org/grpc"
+       "k8s.io/helm/pkg/proto/hapi/release"
+       "k8s.io/helm/pkg/proto/hapi/services"
+
+       "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
+       log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
+
+       pkgerrors "github.com/pkg/errors"
+)
+
+//implements basic stream implementation that logs progress
+//and updates state in DB
+type StreamImpl struct {
+       Key               HealthcheckKey
+       StoreName         string
+       Tag               string
+       grpc.ServerStream //only to satisfy interface needs, it's not used
+}
+
+var _ services.ReleaseService_RunReleaseTestServer = StreamImpl{}
+
+func NewStream(key HealthcheckKey, store, tag string) *StreamImpl {
+       s := StreamImpl{
+               Key:       key,
+               StoreName: store,
+               Tag:       tag,
+       }
+       return &s
+}
+
+func (si StreamImpl) Send(m *services.TestReleaseResponse) error {
+       log.Info("Stream message", log.Fields{"msg": m})
+
+       DBResp, err := db.DBconn.Read(si.StoreName, si.Key, si.Tag)
+       if err != nil || DBResp == nil {
+               return pkgerrors.Wrap(err, "Error retrieving Healthcheck data")
+       }
+
+       resp := InstanceHCStatus{}
+       err = db.DBconn.Unmarshal(DBResp, &resp)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Unmarshaling Healthcheck Value")
+       }
+
+       resp.Status = release.TestRun_Status_name[int32(m.Status)]
+       if m.Msg != "" {
+               if resp.Info == "" {
+                       resp.Info = m.Msg
+               } else {
+                       resp.Info = resp.Info + "\n" + m.Msg
+               }
+       }
+
+       err = db.DBconn.Update(si.StoreName, si.Key, si.Tag, resp)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Updating Healthcheck")
+       }
+       return nil
+}
index d39e840..31047eb 100644 (file)
@@ -305,14 +305,6 @@ func (h *TemplateClient) GenerateKubernetesArtifacts(inputPath string, valueFile
                        continue
                }
 
-               hook, _ := isHook(b, data)
-               // if hook is not nil, then append it to hooks list and continue
-               // if it's not, disregard error
-               if hook != nil {
-                       hookList = append(hookList, hook)
-                       continue
-               }
-
                mfilePath := filepath.Join(outputDir, m.Name)
                utils.EnsureDirectory(mfilePath)
                err = ioutil.WriteFile(mfilePath, []byte(data), 0666)
@@ -320,6 +312,14 @@ func (h *TemplateClient) GenerateKubernetesArtifacts(inputPath string, valueFile
                        return retData, hookList, err
                }
 
+               hook, _ := isHook(mfilePath, data)
+               // if hook is not nil, then append it to hooks list and continue
+               // if it's not, disregard error
+               if hook != nil {
+                       hookList = append(hookList, hook)
+                       continue
+               }
+
                gvk, err := getGroupVersionKind(data)
                if err != nil {
                        return retData, hookList, err