From d31d441631744bc3601a4c1194025ca9a113c9fc Mon Sep 17 00:00:00 2001 From: Fiete Ostkamp Date: Thu, 11 Dec 2025 10:40:13 +0100 Subject: [PATCH] Context propagation for query handler - connect context in QueryManager - make sure context is passed everywhere in instance.go Issue-ID: MULTICLOUD-1538 Change-Id: If14d30aacc4cae569f2bdeb6b51be37948c27da9 Signed-off-by: Fiete Ostkamp --- src/k8splugin/api/queryhandler.go | 2 +- src/k8splugin/internal/app/config_backend.go | 2 +- src/k8splugin/internal/app/instance.go | 64 +++++++++++++++------------- src/k8splugin/internal/app/query.go | 10 ++--- 4 files changed, 42 insertions(+), 36 deletions(-) diff --git a/src/k8splugin/api/queryhandler.go b/src/k8splugin/api/queryhandler.go index 497767b0..cfde3569 100644 --- a/src/k8splugin/api/queryhandler.go +++ b/src/k8splugin/api/queryhandler.go @@ -52,7 +52,7 @@ func (i queryHandler) queryHandler(w http.ResponseWriter, r *http.Request) { return } // instance id is irrelevant here - resp, err := i.client.Query(namespace, cloudRegion, apiVersion, kind, name, labels) + resp, err := i.client.Query(r.Context(), namespace, cloudRegion, apiVersion, kind, name, labels) if err != nil { log.Error("Error getting Query results", log.Fields{ "error": err, diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go index 13fbc4e2..b49f0708 100644 --- a/src/k8splugin/internal/app/config_backend.go +++ b/src/k8splugin/internal/app/config_backend.go @@ -594,7 +594,7 @@ func scheduleResources(c chan configResourceList) { for _, res := range data.resources { tmpResources = append(tmpResources, res.Resource) } - err = k8sClient.deleteResources(context.TODO(), helm.GetReverseK8sResources(tmpResources), inst.Namespace) + err = k8sClient.deleteResources(ctx, helm.GetReverseK8sResources(tmpResources), inst.Namespace) if err != nil { log.Printf("Error Deleting resources: %s", err.Error()) continue diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go index 4b116f0a..c7a7fd35 100644 --- a/src/k8splugin/internal/app/instance.go +++ b/src/k8splugin/internal/app/instance.go @@ -463,7 +463,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques key := InstanceKey{ ID: id, } - value, err := db.DBconn.Read(context.TODO(), v.storeName, key, v.tagInst) + value, err := db.DBconn.Read(ctx, v.storeName, key, v.tagInst) if err != nil { return InstanceResponse{}, pkgerrors.Wrap(err, "Upgrade Instance") } @@ -496,26 +496,26 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques if currentInstance.Request.CloudRegion != u.CloudRegion { newInstance, err := v.Create(ctx, i, "") if err == nil { - err = v.Delete(context.TODO(), id) + err = v.Delete(ctx, id) if err == nil { newInstanceDb, _ := v.GetFull(ctx, newInstance.ID) oldKey := InstanceKey{ ID: newInstance.ID, } - err2 := db.DBconn.Delete(context.TODO(), v.storeName, oldKey, v.tagInst) + err2 := db.DBconn.Delete(ctx, v.storeName, oldKey, v.tagInst) if err2 != nil { log.Printf("Delete of the temporal instance from the DB has failed %s", err2.Error()) } namegenerator.Release(newInstance.ID) newInstanceDb.ID = id newInstance.ID = id - err = db.DBconn.Create(context.TODO(), v.storeName, key, v.tagInst, newInstanceDb) + err = db.DBconn.Create(ctx, v.storeName, key, v.tagInst, newInstanceDb) if err != nil { return newInstance, pkgerrors.Wrap(err, "Create Instance DB Entry after update failed") } return newInstance, nil } else { - err2 := v.Delete(context.TODO(), newInstance.ID) + err2 := v.Delete(ctx, newInstance.ID) if err2 != nil { log.Printf("Delete of the instance from the new region failed with error %s", err2.Error()) } @@ -589,7 +589,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace") } - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData) if err != nil { return InstanceResponse{}, pkgerrors.Wrap(err, "Updating Instance DB Entry") } @@ -613,7 +613,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques } dbData.Status = "UPGRADING" - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData) if err != nil { return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry") } @@ -664,7 +664,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques dbData.Status = "UPGRADED" dbData.Resources = upgradedResources - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData) if err != nil { return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry") } @@ -690,14 +690,14 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques } else { dbData.Status = "DONE" } - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData) if err != nil { log.Printf("Update Instance DB Entry for release %s has error.", releaseName) } }() } else { dbData.Status = "DONE" - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData) if err != nil { log.Printf("Update Instance DB Entry for release %s has error.", releaseName) } @@ -711,7 +711,7 @@ func (v *InstanceClient) GetFull(ctx context.Context, id string) (InstanceDbData key := InstanceKey{ ID: id, } - value, err := db.DBconn.Read(context.TODO(), v.storeName, key, v.tagInst) + value, err := db.DBconn.Read(ctx, v.storeName, key, v.tagInst) if err != nil { return InstanceDbData{}, pkgerrors.Wrap(err, "Get Instance") } @@ -757,7 +757,7 @@ func (v *InstanceClient) Get(ctx context.Context, id string) (InstanceResponse, key := InstanceKey{ ID: id, } - value, err := db.DBconn.Read(context.TODO(), v.storeName, key, v.tagInst) + value, err := db.DBconn.Read(ctx, v.storeName, key, v.tagInst) if err != nil { return InstanceResponse{}, pkgerrors.Wrap(err, "Get Instance") } @@ -783,7 +783,7 @@ func (v *InstanceClient) Query(ctx context.Context, id, apiVersion, kind, name, key := InstanceKey{ ID: id, } - value, err := db.DBconn.Read(context.TODO(), v.storeName, key, v.tagInst) + value, err := db.DBconn.Read(ctx, v.storeName, key, v.tagInst) if err != nil { return InstanceStatus{}, pkgerrors.Wrap(err, "Get Instance") } @@ -804,7 +804,7 @@ func (v *InstanceClient) Query(ctx context.Context, id, apiVersion, kind, name, labels = labels + labelValue } - resources, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, apiVersion, kind, name, labels) + resources, err := queryClient.Query(ctx, resResp.Namespace, resResp.Request.CloudRegion, apiVersion, kind, name, labels) if err != nil { return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources") } @@ -896,7 +896,7 @@ Main: queryClient := NewQueryClient() labelValue := config.GetConfiguration().KubernetesLabelName + "=" + id for _, extraType := range profile.ExtraResourceTypes { - queryStatus, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, extraType.GroupVersion().Identifier(), extraType.Kind, "", labelValue) + queryStatus, err := queryClient.Query(ctx, resResp.Namespace, resResp.Request.CloudRegion, extraType.GroupVersion().Identifier(), extraType.Kind, "", labelValue) if err != nil { return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources") } @@ -996,7 +996,7 @@ func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient K // Empty string returns all func (v *InstanceClient) List(ctx context.Context, rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) { - dbres, err := db.DBconn.ReadAll(context.TODO(), v.storeName, v.tagInst) + dbres, err := db.DBconn.ReadAll(ctx, v.storeName, v.tagInst) if err != nil || len(dbres) == 0 { return []InstanceMiniResponse{}, pkgerrors.Wrap(err, "Listing Instances") } @@ -1083,6 +1083,8 @@ func (v *InstanceClient) Find(ctx context.Context, rbName string, version string // Delete the Instance from database func (v *InstanceClient) Delete(ctx context.Context, id string) error { + ctx, span := tracer.Start(ctx, "InstanceClient.Delete") + defer span.End() inst, err := v.GetFull(ctx, id) if err != nil { return pkgerrors.Wrap(err, "Error getting Instance") @@ -1149,7 +1151,7 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error { if len(hookClient.getHookByEvent(inst.Hooks, release.HookPostDelete)) != 0 { go func() { inst.HookProgress = "" - if err := v.runPostDelete(k8sClient, hookClient, &inst, 0, true); err != nil { + if err := v.runPostDelete(ctx, k8sClient, hookClient, &inst, 0, true); err != nil { log.Print(err.Error()) } }() @@ -1171,6 +1173,8 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error { // Continue the instantiation func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) error { + ctx, span := tracer.Start(ctx, "InstanceClient.RecoverCreateOrDelete") + defer span.End() instance, err := v.GetFull(ctx, id) if err != nil { return pkgerrors.Wrap(err, "Error getting instance "+id+", skip this instance. Error detail") @@ -1189,7 +1193,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e log.Printf(" Resolving template for release %s", instance.Request.ReleaseName) _, _, hookList, _, _ := rb.NewProfileClient().Resolve(instance.Request.RBName, instance.Request.RBVersion, instance.Request.ProfileName, overrideValues, instance.Request.ReleaseName) instance.Hooks = hookList - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance) if err != nil { return pkgerrors.Wrap(err, "Update Instance DB Entry") } @@ -1236,7 +1240,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e } else { instance.Status = "DONE" } - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance) if err != nil { log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) } @@ -1249,7 +1253,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e if err != nil { log.Printf(" Instance: %s, Error running pre-delete hooks error: %s", id, err) instance.Status = "PRE-DELETE-FAILED" - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance) if err != nil { log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) } @@ -1264,7 +1268,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e //will not delete the instance in Db to avoid error when SO call delete again and there is not instance in DB //the instance in DB will be deleted when SO call delete again. instance.HookProgress = "" - if err := v.runPostDelete(k8sClient, hookClient, &instance, 0, false); err != nil { + if err := v.runPostDelete(ctx, k8sClient, hookClient, &instance, 0, false); err != nil { log.Print(err.Error()) } }() @@ -1272,7 +1276,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e //Plugin quits during post-delete hooks -> continue go func() { log.Printf(" The plugin quits during post-delete hook of this instance, continue post-delete hook") - if err := v.runPostDelete(k8sClient, hookClient, &instance, completedHooks, true); err != nil { + if err := v.runPostDelete(ctx, k8sClient, hookClient, &instance, completedHooks, true); err != nil { log.Print(err.Error()) } }() @@ -1283,12 +1287,14 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e return nil } -func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *HookClient, instance *InstanceDbData, startIndex int, clearDb bool) error { +func (v *InstanceClient) runPostDelete(ctx context.Context, k8sClient KubernetesClient, hookClient *HookClient, instance *InstanceDbData, startIndex int, clearDb bool) error { + ctx, span := tracer.Start(ctx, "InstanceClient.runPostDelete") + defer span.End() key := InstanceKey{ ID: instance.ID, } instance.Status = "POST-DELETE" - err := db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance) + err := db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance) if err != nil { log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) } @@ -1297,7 +1303,7 @@ func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *H //If this case happen, user should clean the cluster log.Printf(" Instance: %s, Error running post-delete hooks error: %s", instance.ID, err) instance.Status = "POST-DELETE-FAILED" - err2 := db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance) + err2 := db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance) if err2 != nil { log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) return pkgerrors.Wrap(err2, "Delete Instance DB Entry") @@ -1310,14 +1316,14 @@ func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *H if err != nil { log.Print(err.Error()) } - err = db.DBconn.Delete(context.TODO(), v.storeName, key, v.tagInst) + err = db.DBconn.Delete(ctx, v.storeName, key, v.tagInst) if err != nil { log.Printf("Delete Instance DB Entry for release %s has error.", instance.ReleaseName) return pkgerrors.Wrap(err, "Delete Instance DB Entry") } } else { instance.Status = "DELETED" - err := db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance) + err := db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance) if err != nil { log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName) return pkgerrors.Wrap(err, "Update Instance DB Entry") @@ -1333,13 +1339,13 @@ func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *H GVK: h.KRT.GVK, Name: h.Hook.Name, } - if _, err := k8sClient.GetResourceStatus(context.TODO(), res, hookClient.kubeNameSpace); err == nil { + if _, err := k8sClient.GetResourceStatus(ctx, res, hookClient.kubeNameSpace); err == nil { remainHookRss = append(remainHookRss, res) log.Printf(" Rss %s will be deleted.", res.Name) } } if len(remainHookRss) > 0 { - err = k8sClient.deleteResources(context.TODO(), remainHookRss, hookClient.kubeNameSpace) + err = k8sClient.deleteResources(ctx, remainHookRss, hookClient.kubeNameSpace) if err != nil { log.Printf("Error cleaning Hook Rss, please do it manually if needed. Error: %s", err.Error()) } diff --git a/src/k8splugin/internal/app/query.go b/src/k8splugin/internal/app/query.go index 2c1b5466..0b830ba5 100644 --- a/src/k8splugin/internal/app/query.go +++ b/src/k8splugin/internal/app/query.go @@ -32,7 +32,7 @@ type QueryStatus struct { // QueryManager is an interface exposes the instantiation functionality type QueryManager interface { - Query(namespace, cloudRegion, apiVersion, kind, name, labels string) (QueryStatus, error) + Query(ctx context.Context, namespace, cloudRegion, apiVersion, kind, name, labels string) (QueryStatus, error) } // QueryClient implements the InstanceManager interface @@ -52,19 +52,19 @@ func NewQueryClient() *QueryClient { } // Query returns state of instance's filtered resources -func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labels string) (QueryStatus, error) { +func (v *QueryClient) Query(ctx context.Context, namespace, cloudRegion, apiVersion, kind, name, labels string) (QueryStatus, error) { //Read the status from the DD k8sClient := KubernetesClient{} - err := k8sClient.Init(context.TODO(), cloudRegion, "dummy") //we don't care about instance id in this request + err := k8sClient.Init(ctx, cloudRegion, "dummy") //we don't care about instance id in this request if err != nil { return QueryStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") } var resourcesStatus []ResourceStatus if name != "" { - resList, err := k8sClient.queryResources(context.TODO(), apiVersion, kind, labels, namespace) + resList, err := k8sClient.queryResources(ctx, apiVersion, kind, labels, namespace) if err != nil { return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources") } @@ -81,7 +81,7 @@ func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labe resourcesStatus = resList } } else { - resList, err := k8sClient.queryResources(context.TODO(), apiVersion, kind, labels, namespace) + resList, err := k8sClient.queryResources(ctx, apiVersion, kind, labels, namespace) if err != nil { return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources") } -- 2.16.6