Context propagation for HookClient and InstanceStatusSubManager 29/142729/2
authorFiete Ostkamp <fiete.ostkamp@telekom.de>
Thu, 11 Dec 2025 11:52:59 +0000 (12:52 +0100)
committerFiete Ostkamp <fiete.ostkamp@telekom.de>
Thu, 11 Dec 2025 12:18:16 +0000 (13:18 +0100)
- pass context into HookClient and InstanceStatusSubManager
- reduce sleep interval in hookHasDeletePolicy from
  5 seconds to 500 milliseconds

Issue-ID: MULTICLOUD-1538
Change-Id: I7dd8d0b2efd10a353cee0781eef4af532978ea27
Signed-off-by: Fiete Ostkamp <fiete.ostkamp@telekom.de>
src/k8splugin/api/api.go
src/k8splugin/api/statussubhandler.go
src/k8splugin/internal/app/hook.go
src/k8splugin/internal/app/hook_test.go
src/k8splugin/internal/app/instance.go
src/k8splugin/internal/app/subscription.go
src/k8splugin/internal/healthcheck/healthcheck.go
src/k8splugin/internal/namegenerator/namegenerator.go

index e0ab60d..e1f4f04 100644 (file)
@@ -17,6 +17,7 @@ limitations under the License.
 package api
 
 import (
+       "context"
        "os"
 
        "github.com/gorilla/mux"
@@ -72,7 +73,7 @@ func NewRouter(defClient rb.DefinitionManager,
        // Status handler routes
        if subscriptionClient == nil {
                subscriptionClient = app.NewInstanceStatusSubClient()
-               subscriptionClient.RestoreWatchers()
+               subscriptionClient.RestoreWatchers(context.TODO())
        }
        instanceStatusSubHandler := instanceStatusSubHandler{client: subscriptionClient}
        instRouter.HandleFunc("/instance/{instID}/status", instHandler.statusHandler).Methods("GET")
index c5c8de2..7d9744d 100644 (file)
@@ -71,7 +71,7 @@ func (iss instanceStatusSubHandler) createHandler(w http.ResponseWriter, r *http
                return
        }
 
-       resp, err := iss.client.Create(id, subRequest)
+       resp, err := iss.client.Create(r.Context(), id, subRequest)
        if err != nil {
                log.Error("Error creating subscription", log.Fields{
                        "error":    err,
@@ -100,7 +100,7 @@ func (iss instanceStatusSubHandler) getHandler(w http.ResponseWriter, r *http.Re
        instanceId := vars["instID"]
        subId := vars["subID"]
 
-       resp, err := iss.client.Get(instanceId, subId)
+       resp, err := iss.client.Get(r.Context(), instanceId, subId)
        if err != nil {
                log.Error("Error getting instance's Status Subscription", log.Fields{
                        "error":          err,
@@ -159,7 +159,7 @@ func (iss instanceStatusSubHandler) updateHandler(w http.ResponseWriter, r *http
                return
        }
 
-       resp, err := iss.client.Update(instanceId, subId, subRequest)
+       resp, err := iss.client.Update(r.Context(), instanceId, subId, subRequest)
        if err != nil {
                log.Error("Error updating instance's Status Subscription", log.Fields{
                        "error":          err,
@@ -188,7 +188,7 @@ func (iss instanceStatusSubHandler) deleteHandler(w http.ResponseWriter, r *http
        instanceId := vars["instID"]
        subId := vars["subID"]
 
-       err := iss.client.Delete(instanceId, subId)
+       err := iss.client.Delete(r.Context(), instanceId, subId)
        if err != nil {
                log.Error("Error deleting instance's Status Subscription", log.Fields{
                        "error":          err,
@@ -205,7 +205,7 @@ func (iss instanceStatusSubHandler) listHandler(w http.ResponseWriter, r *http.R
        vars := mux.Vars(r)
        id := vars["instID"]
 
-       resp, err := iss.client.List(id)
+       resp, err := iss.client.List(r.Context(), id)
        if err != nil {
                log.Error("Error listing instance Status Subscriptions", log.Fields{
                        "error":       err,
index b419ddd..36e1365 100644 (file)
@@ -68,13 +68,15 @@ func (hc *HookClient) getHookByEvent(hs []*helm.Hook, hook release.HookEvent) []
 
 // Mimic function ExecHook in helm/pkg/tiller/release_server.go
 func (hc *HookClient) ExecHook(
+       ctx context.Context,
        k8sClient KubernetesClient,
        hs []*helm.Hook,
        hook release.HookEvent,
        timeout int64,
        startIndex int,
        dbData *InstanceDbData) error {
-       ctx := context.TODO()
+       ctx, span := tracer.Start(ctx, "HookClient.ExecHook")
+       defer span.End()
        executingHooks := hc.getHookByEvent(hs, hook)
        key := InstanceKey{
                ID: hc.id,
@@ -90,14 +92,14 @@ func (hc *HookClient) ExecHook(
                if len(h.Hook.DeletePolicies) == 0 {
                        h.Hook.DeletePolicies = []release.HookDeletePolicy{release.HookBeforeHookCreation}
                }
-               if err := hc.deleteHookByPolicy(h, release.HookBeforeHookCreation, k8sClient); err != nil {
+               if err := hc.deleteHookByPolicy(ctx, h, release.HookBeforeHookCreation, k8sClient); err != nil {
                        return err
                }
                //update DB here before the creation of the hook, if the plugin quits
                //-> when it comes back, it will continue from next hook and consider that this one is done
                if dbData != nil {
                        dbData.HookProgress = fmt.Sprintf("%d/%d", index+1, len(executingHooks))
-                       err := db.DBconn.Update(context.TODO(), hc.dbStoreName, key, hc.dbTagInst, dbData)
+                       err := db.DBconn.Update(ctx, hc.dbStoreName, key, hc.dbTagInst, dbData)
                        if err != nil {
                                return err
                        }
@@ -110,7 +112,7 @@ func (hc *HookClient) ExecHook(
                createdHook, err := k8sClient.CreateKind(ctx, resTempl, hc.kubeNameSpace)
                if err != nil {
                        log.Printf("  Instance: %s, Warning: %s hook %s, filePath: %s, error: %s", hc.id, hook, h.Hook.Name, h.KRT.FilePath, err)
-                       hc.deleteHookByPolicy(h, release.HookFailed, k8sClient)
+                       hc.deleteHookByPolicy(ctx, h, release.HookFailed, k8sClient)
                        return err
                }
                if hook != "crd-install" {
@@ -121,7 +123,7 @@ func (hc *HookClient) ExecHook(
                                if err != nil {
                                        // If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted
                                        // under failed condition. If so, then clear the corresponding resource object in the hook
-                                       if err := hc.deleteHookByPolicy(h, release.HookFailed, k8sClient); err != nil {
+                                       if err := hc.deleteHookByPolicy(ctx, h, release.HookFailed, k8sClient); err != nil {
                                                return err
                                        }
                                        return err
@@ -133,7 +135,7 @@ func (hc *HookClient) ExecHook(
        }
 
        for _, h := range executingHooks {
-               if err := hc.deleteHookByPolicy(h, release.HookSucceeded, k8sClient); err != nil {
+               if err := hc.deleteHookByPolicy(ctx, h, release.HookSucceeded, k8sClient); err != nil {
                        log.Printf("  Instance: %s, Warning: Error deleting %s hook %s based on delete policy, continue", hc.id, hook, h.Hook.Name)
                        return err
                }
@@ -142,14 +144,14 @@ func (hc *HookClient) ExecHook(
        return nil
 }
 
-func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDeletePolicy, k8sClient KubernetesClient) error {
+func (hc *HookClient) deleteHookByPolicy(ctx context.Context, h *helm.Hook, policy release.HookDeletePolicy, k8sClient KubernetesClient) error {
        rss := helm.KubernetesResource{
                GVK:  h.KRT.GVK,
                Name: h.Hook.Name,
        }
        if hookHasDeletePolicy(h, policy) {
                log.Printf("  Instance: %s, Deleting hook %s due to %q policy", hc.id, h.Hook.Name, policy)
-               if errHookDelete := k8sClient.deleteResources(context.TODO(), append([]helm.KubernetesResource{}, rss), hc.kubeNameSpace); errHookDelete != nil {
+               if errHookDelete := k8sClient.deleteResources(ctx, append([]helm.KubernetesResource{}, rss), hc.kubeNameSpace); errHookDelete != nil {
                        if strings.Contains(errHookDelete.Error(), "not found") {
                                return nil
                        } else {
@@ -161,7 +163,7 @@ func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDelete
                        isDeleted := false
                        for !isDeleted {
                                log.Printf("  Instance: %s, Waiting on deleting hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy)
-                               if _, err := k8sClient.GetResourceStatus(context.TODO(), rss, hc.kubeNameSpace); err != nil {
+                               if _, err := k8sClient.GetResourceStatus(ctx, rss, hc.kubeNameSpace); err != nil {
                                        if strings.Contains(err.Error(), "not found") {
                                                log.Printf("  Instance: %s, Deleted hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy)
                                                return nil
@@ -169,7 +171,7 @@ func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDelete
                                                isDeleted = true
                                        }
                                }
-                               time.Sleep(5 * time.Second)
+                               time.Sleep(500 * time.Millisecond)
                        }
                }
        }
index 79b6bcc..7cc702b 100644 (file)
@@ -247,19 +247,19 @@ func TestExecHook(t *testing.T) {
        if err != nil {
                t.Fatal(err.Error())
        }
-       err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, 10, 0, nil)
+       err = hookClient.ExecHook(context.TODO(), k8sClient, hookList, release.HookPreInstall, 10, 0, nil)
        if err != nil {
                t.Fatal(err.Error())
        }
-       err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall, 10, 0, nil)
+       err = hookClient.ExecHook(context.TODO(), k8sClient, hookList, release.HookPostInstall, 10, 0, nil)
        if err != nil {
                t.Fatal(err.Error())
        }
-       err = hookClient.ExecHook(k8sClient, hookList, release.HookPreDelete, 10, 0, nil)
+       err = hookClient.ExecHook(context.TODO(), k8sClient, hookList, release.HookPreDelete, 10, 0, nil)
        if err != nil {
                t.Fatal(err.Error())
        }
-       err = hookClient.ExecHook(k8sClient, hookList, release.HookPostDelete, 10, 0, nil)
+       err = hookClient.ExecHook(context.TODO(), k8sClient, hookList, release.HookPostDelete, 10, 0, nil)
        if err != nil {
                t.Fatal(err.Error())
        }
index c7a7fd3..db77962 100644 (file)
@@ -273,7 +273,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
        var generatedId string = ""
        var finalId string = ""
        if newId == "" {
-               generatedId = namegenerator.Generate()
+               generatedId = namegenerator.Generate(ctx)
                finalId = generatedId
        } else {
                finalId = newId
@@ -284,14 +284,14 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
        //Execute the kubernetes create command
        sortedTemplates, crdList, hookList, releaseName, err := rb.NewProfileClient().Resolve(i.RBName, i.RBVersion, i.ProfileName, overrideValues, i.ReleaseName)
        if err != nil {
-               namegenerator.Release(generatedId)
+               namegenerator.Release(ctx, generatedId)
                return InstanceResponse{}, pkgerrors.Wrap(err, "Error resolving helm charts")
        }
 
        k8sClient := KubernetesClient{}
        err = k8sClient.Init(ctx, i.CloudRegion, finalId)
        if err != nil {
-               namegenerator.Release(generatedId)
+               namegenerator.Release(ctx, generatedId)
                return InstanceResponse{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
        }
 
@@ -333,7 +333,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
 
        err = k8sClient.ensureNamespace(ctx, profile.Namespace)
        if err != nil {
-               namegenerator.Release(generatedId)
+               namegenerator.Release(ctx, generatedId)
                return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace")
        }
 
@@ -342,7 +342,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
        }
        err = db.DBconn.Create(ctx, v.storeName, key, v.tagInst, dbData)
        if err != nil {
-               namegenerator.Release(generatedId)
+               namegenerator.Release(ctx, generatedId)
                return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry")
        }
 
@@ -357,14 +357,14 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
 
        hookClient := NewHookClient(profile.Namespace, finalId, v.storeName, v.tagInst)
        if len(hookClient.getHookByEvent(hookList, release.HookPreInstall)) != 0 {
-               err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, hookTimeoutInfo.preInstallTimeOut, 0, &dbData)
+               err = hookClient.ExecHook(ctx, k8sClient, hookList, release.HookPreInstall, hookTimeoutInfo.preInstallTimeOut, 0, &dbData)
                if err != nil {
                        log.Printf("Error running preinstall hooks for release %s, Error: %s. Stop here", releaseName, err)
                        err2 := db.DBconn.Delete(ctx, v.storeName, key, v.tagInst)
                        if err2 != nil {
                                log.Printf("Error cleaning failed instance in DB, please check DB.")
                        } else {
-                               namegenerator.Release(generatedId)
+                               namegenerator.Release(ctx, generatedId)
                        }
                        return InstanceResponse{}, pkgerrors.Wrap(err, "Error running preinstall hooks")
                }
@@ -377,7 +377,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
                if err2 != nil {
                        log.Printf("Delete Instance DB Entry for release %s has error.", releaseName)
                } else {
-                       namegenerator.Release(generatedId)
+                       namegenerator.Release(ctx, generatedId)
                }
                return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry")
        }
@@ -395,7 +395,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
                if err2 != nil {
                        log.Printf("Delete Instance DB Entry for release %s has error.", releaseName)
                } else {
-                       namegenerator.Release(generatedId)
+                       namegenerator.Release(ctx, generatedId)
                }
                return InstanceResponse{}, pkgerrors.Wrap(err, "Create Kubernetes Resources")
        }
@@ -421,7 +421,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
                go func() {
                        dbData.Status = "POST-INSTALL"
                        dbData.HookProgress = ""
-                       err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall, hookTimeoutInfo.postInstallTimeOut, 0, &dbData)
+                       err = hookClient.ExecHook(ctx, k8sClient, hookList, release.HookPostInstall, hookTimeoutInfo.postInstallTimeOut, 0, &dbData)
                        if err != nil {
                                dbData.Status = "POST-INSTALL-FAILED"
                                log.Printf("  Instance: %s, Error running postinstall hooks error: %s", finalId, err)
@@ -506,7 +506,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
                                if err2 != nil {
                                        log.Printf("Delete of the temporal instance from the DB has failed %s", err2.Error())
                                }
-                               namegenerator.Release(newInstance.ID)
+                               namegenerator.Release(ctx, newInstance.ID)
                                newInstanceDb.ID = id
                                newInstance.ID = id
                                err = db.DBconn.Create(ctx, v.storeName, key, v.tagInst, newInstanceDb)
@@ -605,7 +605,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
 
        hookClient := NewHookClient(profile.Namespace, id, v.storeName, v.tagInst)
        if len(hookClient.getHookByEvent(hookList, release.HookPreUpgrade)) != 0 {
-               err = hookClient.ExecHook(k8sClient, hookList, release.HookPreUpgrade, hookTimeoutInfo.preUpgradeTimeout, 0, &dbData)
+               err = hookClient.ExecHook(ctx, k8sClient, hookList, release.HookPreUpgrade, hookTimeoutInfo.preUpgradeTimeout, 0, &dbData)
                if err != nil {
                        log.Printf("Error running preupgrade hooks for release %s, Error: %s. Stop here", releaseName, err)
                        return InstanceResponse{}, pkgerrors.Wrap(err, "Error running preupgrade hooks")
@@ -683,7 +683,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
                go func() {
                        dbData.Status = "POST-UPGRADE"
                        dbData.HookProgress = ""
-                       err = hookClient.ExecHook(k8sClient, hookList, release.HookPostUpgrade, hookTimeoutInfo.postUpgradeTimeout, 0, &dbData)
+                       err = hookClient.ExecHook(ctx, k8sClient, hookList, release.HookPostUpgrade, hookTimeoutInfo.postUpgradeTimeout, 0, &dbData)
                        if err != nil {
                                dbData.Status = "POST-UPGRADE-FAILED"
                                log.Printf("  Instance: %s, Error running postupgrade hooks error: %s", id, err)
@@ -1120,7 +1120,7 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error {
 
        hookClient := NewHookClient(inst.Namespace, id, v.storeName, v.tagInst)
        if len(hookClient.getHookByEvent(inst.Hooks, release.HookPreDelete)) != 0 {
-               err = hookClient.ExecHook(k8sClient, inst.Hooks, release.HookPreDelete, inst.PreDeleteTimeout, 0, &inst)
+               err = hookClient.ExecHook(ctx, k8sClient, inst.Hooks, release.HookPreDelete, inst.PreDeleteTimeout, 0, &inst)
                if err != nil {
                        log.Printf("  Instance: %s, Error running pre-delete hooks error: %s", id, err)
                        inst.Status = "PRE-DELETE-FAILED"
@@ -1157,7 +1157,7 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error {
                }()
        } else {
                subscriptionClient := NewInstanceStatusSubClient()
-               err = subscriptionClient.Cleanup(id)
+               err = subscriptionClient.Cleanup(ctx, id)
                if err != nil {
                        log.Print(err.Error())
                }
@@ -1232,7 +1232,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e
                //Plugin quits during post-install hooks -> continue
                go func() {
                        log.Printf("  The plugin quits during post-install hook of this instance, continue post-install hook")
-                       err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPostInstall, instance.PostInstallTimeout, completedHooks, &instance)
+                       err = hookClient.ExecHook(ctx, k8sClient, instance.Hooks, release.HookPostInstall, instance.PostInstallTimeout, completedHooks, &instance)
                        log.Printf("dbData.HookProgress %s", instance.HookProgress)
                        if err != nil {
                                instance.Status = "POST-INSTALL-FAILED"
@@ -1249,7 +1249,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e
                //Plugin quits during pre-delete hooks -> This already effects the instance -> should continue the deletion
                go func() {
                        log.Printf("  The plugin quits during pre-delete hook of this instance, continue pre-delete hook")
-                       err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPreDelete, instance.PreDeleteTimeout, completedHooks, &instance)
+                       err = hookClient.ExecHook(ctx, k8sClient, instance.Hooks, release.HookPreDelete, instance.PreDeleteTimeout, completedHooks, &instance)
                        if err != nil {
                                log.Printf("  Instance: %s, Error running pre-delete hooks error: %s", id, err)
                                instance.Status = "PRE-DELETE-FAILED"
@@ -1298,7 +1298,7 @@ func (v *InstanceClient) runPostDelete(ctx context.Context, k8sClient Kubernetes
        if err != nil {
                log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName)
        }
-       err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPostDelete, instance.PostDeleteTimeout, startIndex, instance)
+       err = hookClient.ExecHook(ctx, k8sClient, instance.Hooks, release.HookPostDelete, instance.PostDeleteTimeout, startIndex, instance)
        if err != nil {
                //If this case happen, user should clean the cluster
                log.Printf("  Instance: %s, Error running post-delete hooks error: %s", instance.ID, err)
@@ -1312,7 +1312,7 @@ func (v *InstanceClient) runPostDelete(ctx context.Context, k8sClient Kubernetes
        }
        if clearDb {
                subscriptionClient := NewInstanceStatusSubClient()
-               err = subscriptionClient.Cleanup(instance.ID)
+               err = subscriptionClient.Cleanup(ctx, instance.ID)
                if err != nil {
                        log.Print(err.Error())
                }
index 48ecc00..6b4e565 100644 (file)
@@ -137,19 +137,19 @@ var subscriptionNotifyData = subscriptionNotifyManager{
 
 // InstanceStatusSubManager is an interface exposes the status subscription functionality
 type InstanceStatusSubManager interface {
-       Create(instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error)
-       Get(instanceId, subId string) (StatusSubscription, error)
-       Update(instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error)
-       List(instanceId string) ([]StatusSubscription, error)
-       Delete(instanceId, subId string) error
-       Cleanup(instanceId string) error
-       RestoreWatchers()
+       Create(ctx context.Context, instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error)
+       Get(ctx context.Context, instanceId, subId string) (StatusSubscription, error)
+       Update(ctx context.Context, instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error)
+       List(ctx context.Context, instanceId string) ([]StatusSubscription, error)
+       Delete(ctx context.Context, instanceId, subId string) error
+       Cleanup(ctx context.Context, instanceId string) error
+       RestoreWatchers(ctx context.Context)
 }
 
 // Create Status Subscription
-func (iss *InstanceStatusSubClient) Create(instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error) {
+func (iss *InstanceStatusSubClient) Create(ctx context.Context, instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error) {
 
-       _, err := iss.Get(instanceId, subDetails.Name)
+       _, err := iss.Get(ctx, instanceId, subDetails.Name)
        if err == nil {
                return StatusSubscription{}, pkgerrors.New("Subscription already exists")
        }
@@ -182,7 +182,7 @@ func (iss *InstanceStatusSubClient) Create(instanceId string, subDetails Subscri
        lock.Lock()
        defer lock.Unlock()
 
-       err = db.DBconn.Create(context.TODO(), iss.storeName, key, iss.tagInst, sub)
+       err = db.DBconn.Create(ctx, iss.storeName, key, iss.tagInst, sub)
        if err != nil {
                return sub, pkgerrors.Wrap(err, "Creating Status Subscription DB Entry")
        }
@@ -191,13 +191,13 @@ func (iss *InstanceStatusSubClient) Create(instanceId string, subDetails Subscri
                "SubscriptionName": subDetails.Name,
        })
 
-       go runNotifyThread(instanceId, sub.Name)
+       go runNotifyThread(ctx, instanceId, sub.Name)
 
        return sub, nil
 }
 
 // Get Status subscription
-func (iss *InstanceStatusSubClient) Get(instanceId, subId string) (StatusSubscription, error) {
+func (iss *InstanceStatusSubClient) Get(ctx context.Context, instanceId, subId string) (StatusSubscription, error) {
        lock, _, _ := getSubscriptionData(instanceId)
        // Acquire Mutex
        lock.Lock()
@@ -206,7 +206,7 @@ func (iss *InstanceStatusSubClient) Get(instanceId, subId string) (StatusSubscri
                InstanceId:       instanceId,
                SubscriptionName: subId,
        }
-       DBResp, err := db.DBconn.Read(context.TODO(), iss.storeName, key, iss.tagInst)
+       DBResp, err := db.DBconn.Read(ctx, iss.storeName, key, iss.tagInst)
        if err != nil || DBResp == nil {
                return StatusSubscription{}, pkgerrors.Wrap(err, "Error retrieving Subscription data")
        }
@@ -220,8 +220,8 @@ func (iss *InstanceStatusSubClient) Get(instanceId, subId string) (StatusSubscri
 }
 
 // Update status subscription
-func (iss *InstanceStatusSubClient) Update(instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error) {
-       sub, err := iss.Get(instanceId, subDetails.Name)
+func (iss *InstanceStatusSubClient) Update(ctx context.Context, instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error) {
+       sub, err := iss.Get(ctx, instanceId, subDetails.Name)
        if err != nil {
                return StatusSubscription{}, pkgerrors.Wrap(err, "Subscription does not exist")
        }
@@ -248,7 +248,7 @@ func (iss *InstanceStatusSubClient) Update(instanceId, subId string, subDetails
        lock.Lock()
        defer lock.Unlock()
 
-       err = db.DBconn.Update(context.TODO(), iss.storeName, key, iss.tagInst, sub)
+       err = db.DBconn.Update(ctx, iss.storeName, key, iss.tagInst, sub)
        if err != nil {
                return sub, pkgerrors.Wrap(err, "Updating Status Subscription DB Entry")
        }
@@ -261,14 +261,14 @@ func (iss *InstanceStatusSubClient) Update(instanceId, subId string, subDetails
 }
 
 // Get list of status subscriptions
-func (iss *InstanceStatusSubClient) List(instanceId string) ([]StatusSubscription, error) {
+func (iss *InstanceStatusSubClient) List(ctx context.Context, instanceId string) ([]StatusSubscription, error) {
 
        lock, _, _ := getSubscriptionData(instanceId)
        // Acquire Mutex
        lock.Lock()
        defer lock.Unlock()
        // Retrieve info about created status subscriptions
-       dbResp, err := db.DBconn.ReadAll(context.TODO(), iss.storeName, iss.tagInst)
+       dbResp, err := db.DBconn.ReadAll(ctx, iss.storeName, iss.tagInst)
        if err != nil {
                if !strings.Contains(err.Error(), "Did not find any objects with tag") {
                        return []StatusSubscription{}, pkgerrors.Wrap(err, "Getting Status Subscription data")
@@ -306,8 +306,8 @@ func (iss *InstanceStatusSubClient) List(instanceId string) ([]StatusSubscriptio
 }
 
 // Delete status subscription
-func (iss *InstanceStatusSubClient) Delete(instanceId, subId string) error {
-       _, err := iss.Get(instanceId, subId)
+func (iss *InstanceStatusSubClient) Delete(ctx context.Context, instanceId, subId string) error {
+       _, err := iss.Get(ctx, instanceId, subId)
        if err != nil {
                return pkgerrors.Wrap(err, "Subscription does not exist")
        }
@@ -323,7 +323,7 @@ func (iss *InstanceStatusSubClient) Delete(instanceId, subId string) error {
                InstanceId:       instanceId,
                SubscriptionName: subId,
        }
-       err = db.DBconn.Delete(context.TODO(), iss.storeName, key, iss.tagInst)
+       err = db.DBconn.Delete(ctx, iss.storeName, key, iss.tagInst)
        if err != nil {
                return pkgerrors.Wrap(err, "Removing Status Subscription in DB")
        }
@@ -331,14 +331,14 @@ func (iss *InstanceStatusSubClient) Delete(instanceId, subId string) error {
 }
 
 // Cleanup status subscriptions for instance
-func (iss *InstanceStatusSubClient) Cleanup(instanceId string) error {
-       subList, err := iss.List(instanceId)
+func (iss *InstanceStatusSubClient) Cleanup(ctx context.Context, instanceId string) error {
+       subList, err := iss.List(ctx, instanceId)
        if err != nil {
                return err
        }
 
        for _, sub := range subList {
-               err = iss.Delete(instanceId, sub.Name)
+               err = iss.Delete(ctx, instanceId, sub.Name)
                if err != nil {
                        log.Error("Error deleting ", log.Fields{
                                "error": err.Error(),
@@ -350,19 +350,19 @@ func (iss *InstanceStatusSubClient) Cleanup(instanceId string) error {
 }
 
 // Restore status subscriptions notify threads
-func (iss *InstanceStatusSubClient) RestoreWatchers() {
+func (iss *InstanceStatusSubClient) RestoreWatchers(ctx context.Context) {
        go func() {
                time.Sleep(time.Second * 10)
                log.Info("Restoring status subscription notifications", log.Fields{})
                v := NewInstanceClient()
-               instances, err := v.List(context.TODO(), "", "", "")
+               instances, err := v.List(ctx, "", "", "")
                if err != nil {
                        log.Error("Error reading instance list", log.Fields{
                                "error": err.Error(),
                        })
                }
                for _, instance := range instances {
-                       subList, err := iss.List(instance.ID)
+                       subList, err := iss.List(ctx, instance.ID)
                        if err != nil {
                                log.Error("Error reading subscription list for instance", log.Fields{
                                        "error":    err.Error(),
@@ -381,7 +381,7 @@ func (iss *InstanceStatusSubClient) RestoreWatchers() {
                                        })
                                        continue
                                }
-                               go runNotifyThread(instance.ID, sub.Name)
+                               go runNotifyThread(ctx, instance.ID, sub.Name)
                        }
                }
        }()
@@ -608,7 +608,7 @@ func gvkListForInstance(instance InstanceResponse, profile rb.Profile) []schema.
        return list
 }
 
-func runNotifyThread(instanceId, subName string) {
+func runNotifyThread(ctx context.Context, instanceId, subName string) {
        v := NewInstanceClient()
        iss := NewInstanceStatusSubClient()
        var status = InstanceStatus{
@@ -641,7 +641,7 @@ func runNotifyThread(instanceId, subName string) {
                        break
                }
                if changeDetected || status.ResourceCount < 0 {
-                       currentSub, err := iss.Get(instanceId, subName)
+                       currentSub, err := iss.Get(ctx, instanceId, subName)
                        if err != nil {
                                log.Error("Error getting current status", log.Fields{
                                        "error":    err.Error(),
@@ -653,7 +653,7 @@ func runNotifyThread(instanceId, subName string) {
                        } else {
                                timeInSeconds = 5
                        }
-                       newStatus, err := v.Status(context.Background(), instanceId, false)
+                       newStatus, err := v.Status(ctx, instanceId, false)
                        if err != nil {
                                log.Error("Error getting current status", log.Fields{
                                        "error":    err.Error(),
@@ -706,7 +706,7 @@ func runNotifyThread(instanceId, subName string) {
                                                })
                                                currentSub.LastNotifyStatus = notifyResult.result
                                                currentSub.LastNotifyTime = notifyResult.time
-                                               err = db.DBconn.Update(context.TODO(), iss.storeName, key, iss.tagInst, currentSub)
+                                               err = db.DBconn.Update(ctx, iss.storeName, key, iss.tagInst, currentSub)
                                                if err != nil {
                                                        log.Error("Error updating subscription status", log.Fields{
                                                                "error":    err.Error(),
index 49d8f1e..622bb2d 100644 (file)
@@ -19,6 +19,7 @@ import (
        "strings"
        "sync"
 
+       "go.opentelemetry.io/otel"
        "helm.sh/helm/v3/pkg/release"
        "helm.sh/helm/v3/pkg/time"
 
@@ -90,6 +91,8 @@ type InstanceHCOverview struct {
        Hooks      []*helm.Hook           `json:"hooks"`
 }
 
+var tracer = otel.Tracer("internal/healthcheck")
+
 func NewHCClient() *InstanceHCClient {
        return &InstanceHCClient{
                storeName: "rbdef",
@@ -102,14 +105,15 @@ func instanceMiniHCStatusFromStatus(ihcs InstanceHCStatus) InstanceMiniHCStatus
 }
 
 func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, error) {
+       ctx, span := tracer.Start(context.TODO(), "InstanceHCClient.Create")
+       defer span.End()
        //TODO Handle hook delete policies
 
        //Generate ID
-       id := namegenerator.Generate()
+       id := namegenerator.Generate(ctx)
 
        //Determine Cloud Region and namespace
        v := app.NewInstanceClient()
-       ctx := context.TODO()
        instance, err := v.Get(ctx, instanceId)
        if err != nil {
                return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Getting instance")
@@ -202,7 +206,6 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err
                                })
                                update <- true
                                wg.Done()
-                               return
                        }(h.Status)
                }
                go func() {
@@ -212,7 +215,6 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err
                                "InstanceId":    instanceId,
                        })
                        update <- false
-                       return
                }()
                for {
                        select {
index 856a60f..3a6f3ef 100644 (file)
@@ -100,14 +100,14 @@ func (c *cache) readCacheFromDB() error {
 }
 
 // writeCacheToDB will update the DB with the updated cache
-func (c *cache) writeCacheToDB() {
+func (c *cache) writeCacheToDB(ctx context.Context) {
 
        //Update the database as well
-       err := db.DBconn.Update(context.TODO(), storeName, cacheKeyGlobal, tag, c.cache)
+       err := db.DBconn.Update(ctx, storeName, cacheKeyGlobal, tag, c.cache)
        if err != nil {
                // TODO: Replace with DBconn variable
                if strings.Contains(err.Error(), "Error finding master table") {
-                       err = db.DBconn.Create(context.TODO(), storeName, cacheKeyGlobal, tag, c.cache)
+                       err = db.DBconn.Create(ctx, storeName, cacheKeyGlobal, tag, c.cache)
                        if err != nil {
                                log.Println("Error creating the entry in DB. Will try later...")
                                return
@@ -119,7 +119,7 @@ func (c *cache) writeCacheToDB() {
        }
 }
 
-func (c *cache) generateName() string {
+func (c *cache) generateName(ctx context.Context) string {
        c.mux.Lock()
        defer c.mux.Unlock()
 
@@ -137,12 +137,12 @@ func (c *cache) generateName() string {
                c.cache[name] = true
 
                // Update the cache and db
-               c.writeCacheToDB()
+               c.writeCacheToDB(ctx)
                return name
        }
 }
 
-func (c *cache) releaseName(name string) {
+func (c *cache) releaseName(ctx context.Context, name string) {
        if name == "" {
                return
        }
@@ -155,18 +155,18 @@ func (c *cache) releaseName(name string) {
                c.cache[name] = false
 
                // Update the cache and db
-               c.writeCacheToDB()
+               c.writeCacheToDB(ctx)
        }
 }
 
 // Generate returns an autogenerated name
-func Generate() string {
+func Generate(ctx context.Context) string {
 
-       return nameCache.generateName()
+       return nameCache.generateName(ctx)
 }
 
 // Release name from cache
-func Release(name string) {
+func Release(ctx context.Context, name string) {
 
-       nameCache.releaseName(name)
+       nameCache.releaseName(ctx, name)
 }