From d5ec45b8811ef52549ff395ee44eef65c8fa51dc Mon Sep 17 00:00:00 2001 From: Fiete Ostkamp Date: Thu, 11 Dec 2025 12:52:59 +0100 Subject: [PATCH] Context propagation for HookClient and InstanceStatusSubManager - pass context into HookClient and InstanceStatusSubManager - reduce sleep interval in hookHasDeletePolicy from 5 seconds to 500 milliseconds Issue-ID: MULTICLOUD-1538 Change-Id: I7dd8d0b2efd10a353cee0781eef4af532978ea27 Signed-off-by: Fiete Ostkamp --- src/k8splugin/api/api.go | 3 +- src/k8splugin/api/statussubhandler.go | 10 ++-- src/k8splugin/internal/app/hook.go | 22 ++++---- src/k8splugin/internal/app/hook_test.go | 8 +-- src/k8splugin/internal/app/instance.go | 38 ++++++------- src/k8splugin/internal/app/subscription.go | 64 +++++++++++----------- src/k8splugin/internal/healthcheck/healthcheck.go | 10 ++-- .../internal/namegenerator/namegenerator.go | 22 ++++---- 8 files changed, 91 insertions(+), 86 deletions(-) diff --git a/src/k8splugin/api/api.go b/src/k8splugin/api/api.go index e0ab60d8..e1f4f04e 100644 --- a/src/k8splugin/api/api.go +++ b/src/k8splugin/api/api.go @@ -17,6 +17,7 @@ limitations under the License. package api import ( + "context" "os" "github.com/gorilla/mux" @@ -72,7 +73,7 @@ func NewRouter(defClient rb.DefinitionManager, // Status handler routes if subscriptionClient == nil { subscriptionClient = app.NewInstanceStatusSubClient() - subscriptionClient.RestoreWatchers() + subscriptionClient.RestoreWatchers(context.TODO()) } instanceStatusSubHandler := instanceStatusSubHandler{client: subscriptionClient} instRouter.HandleFunc("/instance/{instID}/status", instHandler.statusHandler).Methods("GET") diff --git a/src/k8splugin/api/statussubhandler.go b/src/k8splugin/api/statussubhandler.go index c5c8de23..7d9744d3 100644 --- a/src/k8splugin/api/statussubhandler.go +++ b/src/k8splugin/api/statussubhandler.go @@ -71,7 +71,7 @@ func (iss instanceStatusSubHandler) createHandler(w http.ResponseWriter, r *http return } - resp, err := iss.client.Create(id, subRequest) + resp, err := iss.client.Create(r.Context(), id, subRequest) if err != nil { log.Error("Error creating subscription", log.Fields{ "error": err, @@ -100,7 +100,7 @@ func (iss instanceStatusSubHandler) getHandler(w http.ResponseWriter, r *http.Re instanceId := vars["instID"] subId := vars["subID"] - resp, err := iss.client.Get(instanceId, subId) + resp, err := iss.client.Get(r.Context(), instanceId, subId) if err != nil { log.Error("Error getting instance's Status Subscription", log.Fields{ "error": err, @@ -159,7 +159,7 @@ func (iss instanceStatusSubHandler) updateHandler(w http.ResponseWriter, r *http return } - resp, err := iss.client.Update(instanceId, subId, subRequest) + resp, err := iss.client.Update(r.Context(), instanceId, subId, subRequest) if err != nil { log.Error("Error updating instance's Status Subscription", log.Fields{ "error": err, @@ -188,7 +188,7 @@ func (iss instanceStatusSubHandler) deleteHandler(w http.ResponseWriter, r *http instanceId := vars["instID"] subId := vars["subID"] - err := iss.client.Delete(instanceId, subId) + err := iss.client.Delete(r.Context(), instanceId, subId) if err != nil { log.Error("Error deleting instance's Status Subscription", log.Fields{ "error": err, @@ -205,7 +205,7 @@ func (iss instanceStatusSubHandler) listHandler(w http.ResponseWriter, r *http.R vars := mux.Vars(r) id := vars["instID"] - resp, err := iss.client.List(id) + resp, err := iss.client.List(r.Context(), id) if err != nil { log.Error("Error listing instance Status Subscriptions", log.Fields{ "error": err, diff --git a/src/k8splugin/internal/app/hook.go b/src/k8splugin/internal/app/hook.go index b419ddda..36e13656 100644 --- a/src/k8splugin/internal/app/hook.go +++ b/src/k8splugin/internal/app/hook.go @@ -68,13 +68,15 @@ func (hc *HookClient) getHookByEvent(hs []*helm.Hook, hook release.HookEvent) [] // Mimic function ExecHook in helm/pkg/tiller/release_server.go func (hc *HookClient) ExecHook( + ctx context.Context, k8sClient KubernetesClient, hs []*helm.Hook, hook release.HookEvent, timeout int64, startIndex int, dbData *InstanceDbData) error { - ctx := context.TODO() + ctx, span := tracer.Start(ctx, "HookClient.ExecHook") + defer span.End() executingHooks := hc.getHookByEvent(hs, hook) key := InstanceKey{ ID: hc.id, @@ -90,14 +92,14 @@ func (hc *HookClient) ExecHook( if len(h.Hook.DeletePolicies) == 0 { h.Hook.DeletePolicies = []release.HookDeletePolicy{release.HookBeforeHookCreation} } - if err := hc.deleteHookByPolicy(h, release.HookBeforeHookCreation, k8sClient); err != nil { + if err := hc.deleteHookByPolicy(ctx, h, release.HookBeforeHookCreation, k8sClient); err != nil { return err } //update DB here before the creation of the hook, if the plugin quits //-> when it comes back, it will continue from next hook and consider that this one is done if dbData != nil { dbData.HookProgress = fmt.Sprintf("%d/%d", index+1, len(executingHooks)) - err := db.DBconn.Update(context.TODO(), hc.dbStoreName, key, hc.dbTagInst, dbData) + err := db.DBconn.Update(ctx, hc.dbStoreName, key, hc.dbTagInst, dbData) if err != nil { return err } @@ -110,7 +112,7 @@ func (hc *HookClient) ExecHook( createdHook, err := k8sClient.CreateKind(ctx, resTempl, hc.kubeNameSpace) if err != nil { log.Printf(" Instance: %s, Warning: %s hook %s, filePath: %s, error: %s", hc.id, hook, h.Hook.Name, h.KRT.FilePath, err) - hc.deleteHookByPolicy(h, release.HookFailed, k8sClient) + hc.deleteHookByPolicy(ctx, h, release.HookFailed, k8sClient) return err } if hook != "crd-install" { @@ -121,7 +123,7 @@ func (hc *HookClient) ExecHook( if err != nil { // If a hook is failed, check the annotation of the hook to determine whether the hook should be deleted // under failed condition. If so, then clear the corresponding resource object in the hook - if err := hc.deleteHookByPolicy(h, release.HookFailed, k8sClient); err != nil { + if err := hc.deleteHookByPolicy(ctx, h, release.HookFailed, k8sClient); err != nil { return err } return err @@ -133,7 +135,7 @@ func (hc *HookClient) ExecHook( } for _, h := range executingHooks { - if err := hc.deleteHookByPolicy(h, release.HookSucceeded, k8sClient); err != nil { + if err := hc.deleteHookByPolicy(ctx, h, release.HookSucceeded, k8sClient); err != nil { log.Printf(" Instance: %s, Warning: Error deleting %s hook %s based on delete policy, continue", hc.id, hook, h.Hook.Name) return err } @@ -142,14 +144,14 @@ func (hc *HookClient) ExecHook( return nil } -func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDeletePolicy, k8sClient KubernetesClient) error { +func (hc *HookClient) deleteHookByPolicy(ctx context.Context, h *helm.Hook, policy release.HookDeletePolicy, k8sClient KubernetesClient) error { rss := helm.KubernetesResource{ GVK: h.KRT.GVK, Name: h.Hook.Name, } if hookHasDeletePolicy(h, policy) { log.Printf(" Instance: %s, Deleting hook %s due to %q policy", hc.id, h.Hook.Name, policy) - if errHookDelete := k8sClient.deleteResources(context.TODO(), append([]helm.KubernetesResource{}, rss), hc.kubeNameSpace); errHookDelete != nil { + if errHookDelete := k8sClient.deleteResources(ctx, append([]helm.KubernetesResource{}, rss), hc.kubeNameSpace); errHookDelete != nil { if strings.Contains(errHookDelete.Error(), "not found") { return nil } else { @@ -161,7 +163,7 @@ func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDelete isDeleted := false for !isDeleted { log.Printf(" Instance: %s, Waiting on deleting hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy) - if _, err := k8sClient.GetResourceStatus(context.TODO(), rss, hc.kubeNameSpace); err != nil { + if _, err := k8sClient.GetResourceStatus(ctx, rss, hc.kubeNameSpace); err != nil { if strings.Contains(err.Error(), "not found") { log.Printf(" Instance: %s, Deleted hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy) return nil @@ -169,7 +171,7 @@ func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDelete isDeleted = true } } - time.Sleep(5 * time.Second) + time.Sleep(500 * time.Millisecond) } } } diff --git a/src/k8splugin/internal/app/hook_test.go b/src/k8splugin/internal/app/hook_test.go index 79b6bcc2..7cc702bd 100644 --- a/src/k8splugin/internal/app/hook_test.go +++ b/src/k8splugin/internal/app/hook_test.go @@ -247,19 +247,19 @@ func TestExecHook(t *testing.T) { if err != nil { t.Fatal(err.Error()) } - err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, 10, 0, nil) + err = hookClient.ExecHook(context.TODO(), k8sClient, hookList, release.HookPreInstall, 10, 0, nil) if err != nil { t.Fatal(err.Error()) } - err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall, 10, 0, nil) + err = hookClient.ExecHook(context.TODO(), k8sClient, hookList, release.HookPostInstall, 10, 0, nil) if err != nil { t.Fatal(err.Error()) } - err = hookClient.ExecHook(k8sClient, hookList, release.HookPreDelete, 10, 0, nil) + err = hookClient.ExecHook(context.TODO(), k8sClient, hookList, release.HookPreDelete, 10, 0, nil) if err != nil { t.Fatal(err.Error()) } - err = hookClient.ExecHook(k8sClient, hookList, release.HookPostDelete, 10, 0, nil) + err = hookClient.ExecHook(context.TODO(), k8sClient, hookList, release.HookPostDelete, 10, 0, nil) if err != nil { t.Fatal(err.Error()) } diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go index c7a7fd35..db779622 100644 --- a/src/k8splugin/internal/app/instance.go +++ b/src/k8splugin/internal/app/instance.go @@ -273,7 +273,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st var generatedId string = "" var finalId string = "" if newId == "" { - generatedId = namegenerator.Generate() + generatedId = namegenerator.Generate(ctx) finalId = generatedId } else { finalId = newId @@ -284,14 +284,14 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st //Execute the kubernetes create command sortedTemplates, crdList, hookList, releaseName, err := rb.NewProfileClient().Resolve(i.RBName, i.RBVersion, i.ProfileName, overrideValues, i.ReleaseName) if err != nil { - namegenerator.Release(generatedId) + namegenerator.Release(ctx, generatedId) return InstanceResponse{}, pkgerrors.Wrap(err, "Error resolving helm charts") } k8sClient := KubernetesClient{} err = k8sClient.Init(ctx, i.CloudRegion, finalId) if err != nil { - namegenerator.Release(generatedId) + namegenerator.Release(ctx, generatedId) return InstanceResponse{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") } @@ -333,7 +333,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st err = k8sClient.ensureNamespace(ctx, profile.Namespace) if err != nil { - namegenerator.Release(generatedId) + namegenerator.Release(ctx, generatedId) return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace") } @@ -342,7 +342,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st } err = db.DBconn.Create(ctx, v.storeName, key, v.tagInst, dbData) if err != nil { - namegenerator.Release(generatedId) + namegenerator.Release(ctx, generatedId) return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry") } @@ -357,14 +357,14 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st hookClient := NewHookClient(profile.Namespace, finalId, v.storeName, v.tagInst) if len(hookClient.getHookByEvent(hookList, release.HookPreInstall)) != 0 { - err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, hookTimeoutInfo.preInstallTimeOut, 0, &dbData) + err = hookClient.ExecHook(ctx, k8sClient, hookList, release.HookPreInstall, hookTimeoutInfo.preInstallTimeOut, 0, &dbData) if err != nil { log.Printf("Error running preinstall hooks for release %s, Error: %s. Stop here", releaseName, err) err2 := db.DBconn.Delete(ctx, v.storeName, key, v.tagInst) if err2 != nil { log.Printf("Error cleaning failed instance in DB, please check DB.") } else { - namegenerator.Release(generatedId) + namegenerator.Release(ctx, generatedId) } return InstanceResponse{}, pkgerrors.Wrap(err, "Error running preinstall hooks") } @@ -377,7 +377,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st if err2 != nil { log.Printf("Delete Instance DB Entry for release %s has error.", releaseName) } else { - namegenerator.Release(generatedId) + namegenerator.Release(ctx, generatedId) } return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry") } @@ -395,7 +395,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st if err2 != nil { log.Printf("Delete Instance DB Entry for release %s has error.", releaseName) } else { - namegenerator.Release(generatedId) + namegenerator.Release(ctx, generatedId) } return InstanceResponse{}, pkgerrors.Wrap(err, "Create Kubernetes Resources") } @@ -421,7 +421,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st go func() { dbData.Status = "POST-INSTALL" dbData.HookProgress = "" - err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall, hookTimeoutInfo.postInstallTimeOut, 0, &dbData) + err = hookClient.ExecHook(ctx, k8sClient, hookList, release.HookPostInstall, hookTimeoutInfo.postInstallTimeOut, 0, &dbData) if err != nil { dbData.Status = "POST-INSTALL-FAILED" log.Printf(" Instance: %s, Error running postinstall hooks error: %s", finalId, err) @@ -506,7 +506,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques if err2 != nil { log.Printf("Delete of the temporal instance from the DB has failed %s", err2.Error()) } - namegenerator.Release(newInstance.ID) + namegenerator.Release(ctx, newInstance.ID) newInstanceDb.ID = id newInstance.ID = id err = db.DBconn.Create(ctx, v.storeName, key, v.tagInst, newInstanceDb) @@ -605,7 +605,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques hookClient := NewHookClient(profile.Namespace, id, v.storeName, v.tagInst) if len(hookClient.getHookByEvent(hookList, release.HookPreUpgrade)) != 0 { - err = hookClient.ExecHook(k8sClient, hookList, release.HookPreUpgrade, hookTimeoutInfo.preUpgradeTimeout, 0, &dbData) + err = hookClient.ExecHook(ctx, k8sClient, hookList, release.HookPreUpgrade, hookTimeoutInfo.preUpgradeTimeout, 0, &dbData) if err != nil { log.Printf("Error running preupgrade hooks for release %s, Error: %s. Stop here", releaseName, err) return InstanceResponse{}, pkgerrors.Wrap(err, "Error running preupgrade hooks") @@ -683,7 +683,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques go func() { dbData.Status = "POST-UPGRADE" dbData.HookProgress = "" - err = hookClient.ExecHook(k8sClient, hookList, release.HookPostUpgrade, hookTimeoutInfo.postUpgradeTimeout, 0, &dbData) + err = hookClient.ExecHook(ctx, k8sClient, hookList, release.HookPostUpgrade, hookTimeoutInfo.postUpgradeTimeout, 0, &dbData) if err != nil { dbData.Status = "POST-UPGRADE-FAILED" log.Printf(" Instance: %s, Error running postupgrade hooks error: %s", id, err) @@ -1120,7 +1120,7 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error { hookClient := NewHookClient(inst.Namespace, id, v.storeName, v.tagInst) if len(hookClient.getHookByEvent(inst.Hooks, release.HookPreDelete)) != 0 { - err = hookClient.ExecHook(k8sClient, inst.Hooks, release.HookPreDelete, inst.PreDeleteTimeout, 0, &inst) + err = hookClient.ExecHook(ctx, k8sClient, inst.Hooks, release.HookPreDelete, inst.PreDeleteTimeout, 0, &inst) if err != nil { log.Printf(" Instance: %s, Error running pre-delete hooks error: %s", id, err) inst.Status = "PRE-DELETE-FAILED" @@ -1157,7 +1157,7 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error { }() } else { subscriptionClient := NewInstanceStatusSubClient() - err = subscriptionClient.Cleanup(id) + err = subscriptionClient.Cleanup(ctx, id) if err != nil { log.Print(err.Error()) } @@ -1232,7 +1232,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e //Plugin quits during post-install hooks -> continue go func() { log.Printf(" The plugin quits during post-install hook of this instance, continue post-install hook") - err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPostInstall, instance.PostInstallTimeout, completedHooks, &instance) + err = hookClient.ExecHook(ctx, k8sClient, instance.Hooks, release.HookPostInstall, instance.PostInstallTimeout, completedHooks, &instance) log.Printf("dbData.HookProgress %s", instance.HookProgress) if err != nil { instance.Status = "POST-INSTALL-FAILED" @@ -1249,7 +1249,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e //Plugin quits during pre-delete hooks -> This already effects the instance -> should continue the deletion go func() { log.Printf(" The plugin quits during pre-delete hook of this instance, continue pre-delete hook") - err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPreDelete, instance.PreDeleteTimeout, completedHooks, &instance) + err = hookClient.ExecHook(ctx, k8sClient, instance.Hooks, release.HookPreDelete, instance.PreDeleteTimeout, completedHooks, &instance) if err != nil { log.Printf(" Instance: %s, Error running pre-delete hooks error: %s", id, err) instance.Status = "PRE-DELETE-FAILED" @@ -1298,7 +1298,7 @@ func (v *InstanceClient) runPostDelete(ctx context.Context, k8sClient Kubernetes if err != nil { log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) } - err = hookClient.ExecHook(k8sClient, instance.Hooks, release.HookPostDelete, instance.PostDeleteTimeout, startIndex, instance) + err = hookClient.ExecHook(ctx, k8sClient, instance.Hooks, release.HookPostDelete, instance.PostDeleteTimeout, startIndex, instance) if err != nil { //If this case happen, user should clean the cluster log.Printf(" Instance: %s, Error running post-delete hooks error: %s", instance.ID, err) @@ -1312,7 +1312,7 @@ func (v *InstanceClient) runPostDelete(ctx context.Context, k8sClient Kubernetes } if clearDb { subscriptionClient := NewInstanceStatusSubClient() - err = subscriptionClient.Cleanup(instance.ID) + err = subscriptionClient.Cleanup(ctx, instance.ID) if err != nil { log.Print(err.Error()) } diff --git a/src/k8splugin/internal/app/subscription.go b/src/k8splugin/internal/app/subscription.go index 48ecc00a..6b4e5654 100644 --- a/src/k8splugin/internal/app/subscription.go +++ b/src/k8splugin/internal/app/subscription.go @@ -137,19 +137,19 @@ var subscriptionNotifyData = subscriptionNotifyManager{ // InstanceStatusSubManager is an interface exposes the status subscription functionality type InstanceStatusSubManager interface { - Create(instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error) - Get(instanceId, subId string) (StatusSubscription, error) - Update(instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error) - List(instanceId string) ([]StatusSubscription, error) - Delete(instanceId, subId string) error - Cleanup(instanceId string) error - RestoreWatchers() + Create(ctx context.Context, instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error) + Get(ctx context.Context, instanceId, subId string) (StatusSubscription, error) + Update(ctx context.Context, instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error) + List(ctx context.Context, instanceId string) ([]StatusSubscription, error) + Delete(ctx context.Context, instanceId, subId string) error + Cleanup(ctx context.Context, instanceId string) error + RestoreWatchers(ctx context.Context) } // Create Status Subscription -func (iss *InstanceStatusSubClient) Create(instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error) { +func (iss *InstanceStatusSubClient) Create(ctx context.Context, instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error) { - _, err := iss.Get(instanceId, subDetails.Name) + _, err := iss.Get(ctx, instanceId, subDetails.Name) if err == nil { return StatusSubscription{}, pkgerrors.New("Subscription already exists") } @@ -182,7 +182,7 @@ func (iss *InstanceStatusSubClient) Create(instanceId string, subDetails Subscri lock.Lock() defer lock.Unlock() - err = db.DBconn.Create(context.TODO(), iss.storeName, key, iss.tagInst, sub) + err = db.DBconn.Create(ctx, iss.storeName, key, iss.tagInst, sub) if err != nil { return sub, pkgerrors.Wrap(err, "Creating Status Subscription DB Entry") } @@ -191,13 +191,13 @@ func (iss *InstanceStatusSubClient) Create(instanceId string, subDetails Subscri "SubscriptionName": subDetails.Name, }) - go runNotifyThread(instanceId, sub.Name) + go runNotifyThread(ctx, instanceId, sub.Name) return sub, nil } // Get Status subscription -func (iss *InstanceStatusSubClient) Get(instanceId, subId string) (StatusSubscription, error) { +func (iss *InstanceStatusSubClient) Get(ctx context.Context, instanceId, subId string) (StatusSubscription, error) { lock, _, _ := getSubscriptionData(instanceId) // Acquire Mutex lock.Lock() @@ -206,7 +206,7 @@ func (iss *InstanceStatusSubClient) Get(instanceId, subId string) (StatusSubscri InstanceId: instanceId, SubscriptionName: subId, } - DBResp, err := db.DBconn.Read(context.TODO(), iss.storeName, key, iss.tagInst) + DBResp, err := db.DBconn.Read(ctx, iss.storeName, key, iss.tagInst) if err != nil || DBResp == nil { return StatusSubscription{}, pkgerrors.Wrap(err, "Error retrieving Subscription data") } @@ -220,8 +220,8 @@ func (iss *InstanceStatusSubClient) Get(instanceId, subId string) (StatusSubscri } // Update status subscription -func (iss *InstanceStatusSubClient) Update(instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error) { - sub, err := iss.Get(instanceId, subDetails.Name) +func (iss *InstanceStatusSubClient) Update(ctx context.Context, instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error) { + sub, err := iss.Get(ctx, instanceId, subDetails.Name) if err != nil { return StatusSubscription{}, pkgerrors.Wrap(err, "Subscription does not exist") } @@ -248,7 +248,7 @@ func (iss *InstanceStatusSubClient) Update(instanceId, subId string, subDetails lock.Lock() defer lock.Unlock() - err = db.DBconn.Update(context.TODO(), iss.storeName, key, iss.tagInst, sub) + err = db.DBconn.Update(ctx, iss.storeName, key, iss.tagInst, sub) if err != nil { return sub, pkgerrors.Wrap(err, "Updating Status Subscription DB Entry") } @@ -261,14 +261,14 @@ func (iss *InstanceStatusSubClient) Update(instanceId, subId string, subDetails } // Get list of status subscriptions -func (iss *InstanceStatusSubClient) List(instanceId string) ([]StatusSubscription, error) { +func (iss *InstanceStatusSubClient) List(ctx context.Context, instanceId string) ([]StatusSubscription, error) { lock, _, _ := getSubscriptionData(instanceId) // Acquire Mutex lock.Lock() defer lock.Unlock() // Retrieve info about created status subscriptions - dbResp, err := db.DBconn.ReadAll(context.TODO(), iss.storeName, iss.tagInst) + dbResp, err := db.DBconn.ReadAll(ctx, iss.storeName, iss.tagInst) if err != nil { if !strings.Contains(err.Error(), "Did not find any objects with tag") { return []StatusSubscription{}, pkgerrors.Wrap(err, "Getting Status Subscription data") @@ -306,8 +306,8 @@ func (iss *InstanceStatusSubClient) List(instanceId string) ([]StatusSubscriptio } // Delete status subscription -func (iss *InstanceStatusSubClient) Delete(instanceId, subId string) error { - _, err := iss.Get(instanceId, subId) +func (iss *InstanceStatusSubClient) Delete(ctx context.Context, instanceId, subId string) error { + _, err := iss.Get(ctx, instanceId, subId) if err != nil { return pkgerrors.Wrap(err, "Subscription does not exist") } @@ -323,7 +323,7 @@ func (iss *InstanceStatusSubClient) Delete(instanceId, subId string) error { InstanceId: instanceId, SubscriptionName: subId, } - err = db.DBconn.Delete(context.TODO(), iss.storeName, key, iss.tagInst) + err = db.DBconn.Delete(ctx, iss.storeName, key, iss.tagInst) if err != nil { return pkgerrors.Wrap(err, "Removing Status Subscription in DB") } @@ -331,14 +331,14 @@ func (iss *InstanceStatusSubClient) Delete(instanceId, subId string) error { } // Cleanup status subscriptions for instance -func (iss *InstanceStatusSubClient) Cleanup(instanceId string) error { - subList, err := iss.List(instanceId) +func (iss *InstanceStatusSubClient) Cleanup(ctx context.Context, instanceId string) error { + subList, err := iss.List(ctx, instanceId) if err != nil { return err } for _, sub := range subList { - err = iss.Delete(instanceId, sub.Name) + err = iss.Delete(ctx, instanceId, sub.Name) if err != nil { log.Error("Error deleting ", log.Fields{ "error": err.Error(), @@ -350,19 +350,19 @@ func (iss *InstanceStatusSubClient) Cleanup(instanceId string) error { } // Restore status subscriptions notify threads -func (iss *InstanceStatusSubClient) RestoreWatchers() { +func (iss *InstanceStatusSubClient) RestoreWatchers(ctx context.Context) { go func() { time.Sleep(time.Second * 10) log.Info("Restoring status subscription notifications", log.Fields{}) v := NewInstanceClient() - instances, err := v.List(context.TODO(), "", "", "") + instances, err := v.List(ctx, "", "", "") if err != nil { log.Error("Error reading instance list", log.Fields{ "error": err.Error(), }) } for _, instance := range instances { - subList, err := iss.List(instance.ID) + subList, err := iss.List(ctx, instance.ID) if err != nil { log.Error("Error reading subscription list for instance", log.Fields{ "error": err.Error(), @@ -381,7 +381,7 @@ func (iss *InstanceStatusSubClient) RestoreWatchers() { }) continue } - go runNotifyThread(instance.ID, sub.Name) + go runNotifyThread(ctx, instance.ID, sub.Name) } } }() @@ -608,7 +608,7 @@ func gvkListForInstance(instance InstanceResponse, profile rb.Profile) []schema. return list } -func runNotifyThread(instanceId, subName string) { +func runNotifyThread(ctx context.Context, instanceId, subName string) { v := NewInstanceClient() iss := NewInstanceStatusSubClient() var status = InstanceStatus{ @@ -641,7 +641,7 @@ func runNotifyThread(instanceId, subName string) { break } if changeDetected || status.ResourceCount < 0 { - currentSub, err := iss.Get(instanceId, subName) + currentSub, err := iss.Get(ctx, instanceId, subName) if err != nil { log.Error("Error getting current status", log.Fields{ "error": err.Error(), @@ -653,7 +653,7 @@ func runNotifyThread(instanceId, subName string) { } else { timeInSeconds = 5 } - newStatus, err := v.Status(context.Background(), instanceId, false) + newStatus, err := v.Status(ctx, instanceId, false) if err != nil { log.Error("Error getting current status", log.Fields{ "error": err.Error(), @@ -706,7 +706,7 @@ func runNotifyThread(instanceId, subName string) { }) currentSub.LastNotifyStatus = notifyResult.result currentSub.LastNotifyTime = notifyResult.time - err = db.DBconn.Update(context.TODO(), iss.storeName, key, iss.tagInst, currentSub) + err = db.DBconn.Update(ctx, iss.storeName, key, iss.tagInst, currentSub) if err != nil { log.Error("Error updating subscription status", log.Fields{ "error": err.Error(), diff --git a/src/k8splugin/internal/healthcheck/healthcheck.go b/src/k8splugin/internal/healthcheck/healthcheck.go index 49d8f1e0..622bb2d6 100644 --- a/src/k8splugin/internal/healthcheck/healthcheck.go +++ b/src/k8splugin/internal/healthcheck/healthcheck.go @@ -19,6 +19,7 @@ import ( "strings" "sync" + "go.opentelemetry.io/otel" "helm.sh/helm/v3/pkg/release" "helm.sh/helm/v3/pkg/time" @@ -90,6 +91,8 @@ type InstanceHCOverview struct { Hooks []*helm.Hook `json:"hooks"` } +var tracer = otel.Tracer("internal/healthcheck") + func NewHCClient() *InstanceHCClient { return &InstanceHCClient{ storeName: "rbdef", @@ -102,14 +105,15 @@ func instanceMiniHCStatusFromStatus(ihcs InstanceHCStatus) InstanceMiniHCStatus } func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, error) { + ctx, span := tracer.Start(context.TODO(), "InstanceHCClient.Create") + defer span.End() //TODO Handle hook delete policies //Generate ID - id := namegenerator.Generate() + id := namegenerator.Generate(ctx) //Determine Cloud Region and namespace v := app.NewInstanceClient() - ctx := context.TODO() instance, err := v.Get(ctx, instanceId) if err != nil { return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Getting instance") @@ -202,7 +206,6 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err }) update <- true wg.Done() - return }(h.Status) } go func() { @@ -212,7 +215,6 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err "InstanceId": instanceId, }) update <- false - return }() for { select { diff --git a/src/k8splugin/internal/namegenerator/namegenerator.go b/src/k8splugin/internal/namegenerator/namegenerator.go index 856a60f6..3a6f3ef7 100644 --- a/src/k8splugin/internal/namegenerator/namegenerator.go +++ b/src/k8splugin/internal/namegenerator/namegenerator.go @@ -100,14 +100,14 @@ func (c *cache) readCacheFromDB() error { } // writeCacheToDB will update the DB with the updated cache -func (c *cache) writeCacheToDB() { +func (c *cache) writeCacheToDB(ctx context.Context) { //Update the database as well - err := db.DBconn.Update(context.TODO(), storeName, cacheKeyGlobal, tag, c.cache) + err := db.DBconn.Update(ctx, storeName, cacheKeyGlobal, tag, c.cache) if err != nil { // TODO: Replace with DBconn variable if strings.Contains(err.Error(), "Error finding master table") { - err = db.DBconn.Create(context.TODO(), storeName, cacheKeyGlobal, tag, c.cache) + err = db.DBconn.Create(ctx, storeName, cacheKeyGlobal, tag, c.cache) if err != nil { log.Println("Error creating the entry in DB. Will try later...") return @@ -119,7 +119,7 @@ func (c *cache) writeCacheToDB() { } } -func (c *cache) generateName() string { +func (c *cache) generateName(ctx context.Context) string { c.mux.Lock() defer c.mux.Unlock() @@ -137,12 +137,12 @@ func (c *cache) generateName() string { c.cache[name] = true // Update the cache and db - c.writeCacheToDB() + c.writeCacheToDB(ctx) return name } } -func (c *cache) releaseName(name string) { +func (c *cache) releaseName(ctx context.Context, name string) { if name == "" { return } @@ -155,18 +155,18 @@ func (c *cache) releaseName(name string) { c.cache[name] = false // Update the cache and db - c.writeCacheToDB() + c.writeCacheToDB(ctx) } } // Generate returns an autogenerated name -func Generate() string { +func Generate(ctx context.Context) string { - return nameCache.generateName() + return nameCache.generateName(ctx) } // Release name from cache -func Release(name string) { +func Release(ctx context.Context, name string) { - nameCache.releaseName(name) + nameCache.releaseName(ctx, name) } -- 2.16.6