Context propagation for query handler 25/142725/1
authorFiete Ostkamp <fiete.ostkamp@telekom.de>
Thu, 11 Dec 2025 09:40:13 +0000 (10:40 +0100)
committerFiete Ostkamp <fiete.ostkamp@telekom.de>
Thu, 11 Dec 2025 09:49:40 +0000 (10:49 +0100)
- connect context in QueryManager
- make sure context is passed everywhere in instance.go

Issue-ID: MULTICLOUD-1538
Change-Id: If14d30aacc4cae569f2bdeb6b51be37948c27da9
Signed-off-by: Fiete Ostkamp <fiete.ostkamp@telekom.de>
src/k8splugin/api/queryhandler.go
src/k8splugin/internal/app/config_backend.go
src/k8splugin/internal/app/instance.go
src/k8splugin/internal/app/query.go

index 497767b..cfde356 100644 (file)
@@ -52,7 +52,7 @@ func (i queryHandler) queryHandler(w http.ResponseWriter, r *http.Request) {
                return
        }
        // instance id is irrelevant here
-       resp, err := i.client.Query(namespace, cloudRegion, apiVersion, kind, name, labels)
+       resp, err := i.client.Query(r.Context(), namespace, cloudRegion, apiVersion, kind, name, labels)
        if err != nil {
                log.Error("Error getting Query results", log.Fields{
                        "error":       err,
index 13fbc4e..b49f070 100644 (file)
@@ -594,7 +594,7 @@ func scheduleResources(c chan configResourceList) {
                                for _, res := range data.resources {
                                        tmpResources = append(tmpResources, res.Resource)
                                }
-                               err = k8sClient.deleteResources(context.TODO(), helm.GetReverseK8sResources(tmpResources), inst.Namespace)
+                               err = k8sClient.deleteResources(ctx, helm.GetReverseK8sResources(tmpResources), inst.Namespace)
                                if err != nil {
                                        log.Printf("Error Deleting resources: %s", err.Error())
                                        continue
index 4b116f0..c7a7fd3 100644 (file)
@@ -463,7 +463,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
        key := InstanceKey{
                ID: id,
        }
-       value, err := db.DBconn.Read(context.TODO(), v.storeName, key, v.tagInst)
+       value, err := db.DBconn.Read(ctx, v.storeName, key, v.tagInst)
        if err != nil {
                return InstanceResponse{}, pkgerrors.Wrap(err, "Upgrade Instance")
        }
@@ -496,26 +496,26 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
        if currentInstance.Request.CloudRegion != u.CloudRegion {
                newInstance, err := v.Create(ctx, i, "")
                if err == nil {
-                       err = v.Delete(context.TODO(), id)
+                       err = v.Delete(ctx, id)
                        if err == nil {
                                newInstanceDb, _ := v.GetFull(ctx, newInstance.ID)
                                oldKey := InstanceKey{
                                        ID: newInstance.ID,
                                }
-                               err2 := db.DBconn.Delete(context.TODO(), v.storeName, oldKey, v.tagInst)
+                               err2 := db.DBconn.Delete(ctx, v.storeName, oldKey, v.tagInst)
                                if err2 != nil {
                                        log.Printf("Delete of the temporal instance from the DB has failed %s", err2.Error())
                                }
                                namegenerator.Release(newInstance.ID)
                                newInstanceDb.ID = id
                                newInstance.ID = id
-                               err = db.DBconn.Create(context.TODO(), v.storeName, key, v.tagInst, newInstanceDb)
+                               err = db.DBconn.Create(ctx, v.storeName, key, v.tagInst, newInstanceDb)
                                if err != nil {
                                        return newInstance, pkgerrors.Wrap(err, "Create Instance DB Entry after update failed")
                                }
                                return newInstance, nil
                        } else {
-                               err2 := v.Delete(context.TODO(), newInstance.ID)
+                               err2 := v.Delete(ctx, newInstance.ID)
                                if err2 != nil {
                                        log.Printf("Delete of the instance from the new region failed with error %s", err2.Error())
                                }
@@ -589,7 +589,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
                return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace")
        }
 
-       err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData)
+       err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData)
        if err != nil {
                return InstanceResponse{}, pkgerrors.Wrap(err, "Updating Instance DB Entry")
        }
@@ -613,7 +613,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
        }
 
        dbData.Status = "UPGRADING"
-       err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData)
+       err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData)
        if err != nil {
                return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry")
        }
@@ -664,7 +664,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
 
        dbData.Status = "UPGRADED"
        dbData.Resources = upgradedResources
-       err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData)
+       err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData)
        if err != nil {
                return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry")
        }
@@ -690,14 +690,14 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
                        } else {
                                dbData.Status = "DONE"
                        }
-                       err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData)
+                       err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData)
                        if err != nil {
                                log.Printf("Update Instance DB Entry for release %s has error.", releaseName)
                        }
                }()
        } else {
                dbData.Status = "DONE"
-               err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData)
+               err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData)
                if err != nil {
                        log.Printf("Update Instance DB Entry for release %s has error.", releaseName)
                }
@@ -711,7 +711,7 @@ func (v *InstanceClient) GetFull(ctx context.Context, id string) (InstanceDbData
        key := InstanceKey{
                ID: id,
        }
-       value, err := db.DBconn.Read(context.TODO(), v.storeName, key, v.tagInst)
+       value, err := db.DBconn.Read(ctx, v.storeName, key, v.tagInst)
        if err != nil {
                return InstanceDbData{}, pkgerrors.Wrap(err, "Get Instance")
        }
@@ -757,7 +757,7 @@ func (v *InstanceClient) Get(ctx context.Context, id string) (InstanceResponse,
        key := InstanceKey{
                ID: id,
        }
-       value, err := db.DBconn.Read(context.TODO(), v.storeName, key, v.tagInst)
+       value, err := db.DBconn.Read(ctx, v.storeName, key, v.tagInst)
        if err != nil {
                return InstanceResponse{}, pkgerrors.Wrap(err, "Get Instance")
        }
@@ -783,7 +783,7 @@ func (v *InstanceClient) Query(ctx context.Context, id, apiVersion, kind, name,
        key := InstanceKey{
                ID: id,
        }
-       value, err := db.DBconn.Read(context.TODO(), v.storeName, key, v.tagInst)
+       value, err := db.DBconn.Read(ctx, v.storeName, key, v.tagInst)
        if err != nil {
                return InstanceStatus{}, pkgerrors.Wrap(err, "Get Instance")
        }
@@ -804,7 +804,7 @@ func (v *InstanceClient) Query(ctx context.Context, id, apiVersion, kind, name,
                labels = labels + labelValue
        }
 
-       resources, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, apiVersion, kind, name, labels)
+       resources, err := queryClient.Query(ctx, resResp.Namespace, resResp.Request.CloudRegion, apiVersion, kind, name, labels)
        if err != nil {
                return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources")
        }
@@ -896,7 +896,7 @@ Main:
                queryClient := NewQueryClient()
                labelValue := config.GetConfiguration().KubernetesLabelName + "=" + id
                for _, extraType := range profile.ExtraResourceTypes {
-                       queryStatus, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, extraType.GroupVersion().Identifier(), extraType.Kind, "", labelValue)
+                       queryStatus, err := queryClient.Query(ctx, resResp.Namespace, resResp.Request.CloudRegion, extraType.GroupVersion().Identifier(), extraType.Kind, "", labelValue)
                        if err != nil {
                                return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources")
                        }
@@ -996,7 +996,7 @@ func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient K
 // Empty string returns all
 func (v *InstanceClient) List(ctx context.Context, rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) {
 
-       dbres, err := db.DBconn.ReadAll(context.TODO(), v.storeName, v.tagInst)
+       dbres, err := db.DBconn.ReadAll(ctx, v.storeName, v.tagInst)
        if err != nil || len(dbres) == 0 {
                return []InstanceMiniResponse{}, pkgerrors.Wrap(err, "Listing Instances")
        }
@@ -1083,6 +1083,8 @@ func (v *InstanceClient) Find(ctx context.Context, rbName string, version string
 
 // Delete the Instance from database
 func (v *InstanceClient) Delete(ctx context.Context, id string) error {
+       ctx, span := tracer.Start(ctx, "InstanceClient.Delete")
+       defer span.End()
        inst, err := v.GetFull(ctx, id)
        if err != nil {
                return pkgerrors.Wrap(err, "Error getting Instance")
@@ -1149,7 +1151,7 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error {
        if len(hookClient.getHookByEvent(inst.Hooks, release.HookPostDelete)) != 0 {
                go func() {
                        inst.HookProgress = ""
-                       if err := v.runPostDelete(k8sClient, hookClient, &inst, 0, true); err != nil {
+                       if err := v.runPostDelete(ctx, k8sClient, hookClient, &inst, 0, true); err != nil {
                                log.Print(err.Error())
                        }
                }()
@@ -1171,6 +1173,8 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error {
 
 // Continue the instantiation
 func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) error {
+       ctx, span := tracer.Start(ctx, "InstanceClient.RecoverCreateOrDelete")
+       defer span.End()
        instance, err := v.GetFull(ctx, id)
        if err != nil {
                return pkgerrors.Wrap(err, "Error getting instance "+id+", skip this instance.  Error detail")
@@ -1189,7 +1193,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e
        log.Printf("  Resolving template for release %s", instance.Request.ReleaseName)
        _, _, hookList, _, _ := rb.NewProfileClient().Resolve(instance.Request.RBName, instance.Request.RBVersion, instance.Request.ProfileName, overrideValues, instance.Request.ReleaseName)
        instance.Hooks = hookList
-       err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance)
+       err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance)
        if err != nil {
                return pkgerrors.Wrap(err, "Update Instance DB Entry")
        }
@@ -1236,7 +1240,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e
                        } else {
                                instance.Status = "DONE"
                        }
-                       err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance)
+                       err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance)
                        if err != nil {
                                log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName)
                        }
@@ -1249,7 +1253,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e
                        if err != nil {
                                log.Printf("  Instance: %s, Error running pre-delete hooks error: %s", id, err)
                                instance.Status = "PRE-DELETE-FAILED"
-                               err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance)
+                               err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance)
                                if err != nil {
                                        log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName)
                                }
@@ -1264,7 +1268,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e
                        //will not delete the instance in Db to avoid error when SO call delete again and there is not instance in DB
                        //the instance in DB will be deleted when SO call delete again.
                        instance.HookProgress = ""
-                       if err := v.runPostDelete(k8sClient, hookClient, &instance, 0, false); err != nil {
+                       if err := v.runPostDelete(ctx, k8sClient, hookClient, &instance, 0, false); err != nil {
                                log.Print(err.Error())
                        }
                }()
@@ -1272,7 +1276,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e
                //Plugin quits during post-delete hooks -> continue
                go func() {
                        log.Printf("  The plugin quits during post-delete hook of this instance, continue post-delete hook")
-                       if err := v.runPostDelete(k8sClient, hookClient, &instance, completedHooks, true); err != nil {
+                       if err := v.runPostDelete(ctx, k8sClient, hookClient, &instance, completedHooks, true); err != nil {
                                log.Print(err.Error())
                        }
                }()
@@ -1283,12 +1287,14 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e
        return nil
 }
 
-func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *HookClient, instance *InstanceDbData, startIndex int, clearDb bool) error {
+func (v *InstanceClient) runPostDelete(ctx context.Context, k8sClient KubernetesClient, hookClient *HookClient, instance *InstanceDbData, startIndex int, clearDb bool) error {
+       ctx, span := tracer.Start(ctx, "InstanceClient.runPostDelete")
+       defer span.End()
        key := InstanceKey{
                ID: instance.ID,
        }
        instance.Status = "POST-DELETE"
-       err := db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance)
+       err := db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance)
        if err != nil {
                log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName)
        }
@@ -1297,7 +1303,7 @@ func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *H
                //If this case happen, user should clean the cluster
                log.Printf("  Instance: %s, Error running post-delete hooks error: %s", instance.ID, err)
                instance.Status = "POST-DELETE-FAILED"
-               err2 := db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance)
+               err2 := db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance)
                if err2 != nil {
                        log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName)
                        return pkgerrors.Wrap(err2, "Delete Instance DB Entry")
@@ -1310,14 +1316,14 @@ func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *H
                if err != nil {
                        log.Print(err.Error())
                }
-               err = db.DBconn.Delete(context.TODO(), v.storeName, key, v.tagInst)
+               err = db.DBconn.Delete(ctx, v.storeName, key, v.tagInst)
                if err != nil {
                        log.Printf("Delete Instance DB Entry for release %s has error.", instance.ReleaseName)
                        return pkgerrors.Wrap(err, "Delete Instance DB Entry")
                }
        } else {
                instance.Status = "DELETED"
-               err := db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, instance)
+               err := db.DBconn.Update(ctx, v.storeName, key, v.tagInst, instance)
                if err != nil {
                        log.Printf("Update Instance DB Entry for release %s has error.", instance.ReleaseName)
                        return pkgerrors.Wrap(err, "Update Instance DB Entry")
@@ -1333,13 +1339,13 @@ func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *H
                                GVK:  h.KRT.GVK,
                                Name: h.Hook.Name,
                        }
-                       if _, err := k8sClient.GetResourceStatus(context.TODO(), res, hookClient.kubeNameSpace); err == nil {
+                       if _, err := k8sClient.GetResourceStatus(ctx, res, hookClient.kubeNameSpace); err == nil {
                                remainHookRss = append(remainHookRss, res)
                                log.Printf("  Rss %s will be deleted.", res.Name)
                        }
                }
                if len(remainHookRss) > 0 {
-                       err = k8sClient.deleteResources(context.TODO(), remainHookRss, hookClient.kubeNameSpace)
+                       err = k8sClient.deleteResources(ctx, remainHookRss, hookClient.kubeNameSpace)
                        if err != nil {
                                log.Printf("Error cleaning Hook Rss, please do it manually if needed. Error: %s", err.Error())
                        }
index 2c1b546..0b830ba 100644 (file)
@@ -32,7 +32,7 @@ type QueryStatus struct {
 
 // QueryManager is an interface exposes the instantiation functionality
 type QueryManager interface {
-       Query(namespace, cloudRegion, apiVersion, kind, name, labels string) (QueryStatus, error)
+       Query(ctx context.Context, namespace, cloudRegion, apiVersion, kind, name, labels string) (QueryStatus, error)
 }
 
 // QueryClient implements the InstanceManager interface
@@ -52,19 +52,19 @@ func NewQueryClient() *QueryClient {
 }
 
 // Query returns state of instance's filtered resources
-func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labels string) (QueryStatus, error) {
+func (v *QueryClient) Query(ctx context.Context, namespace, cloudRegion, apiVersion, kind, name, labels string) (QueryStatus, error) {
 
        //Read the status from the DD
 
        k8sClient := KubernetesClient{}
-       err := k8sClient.Init(context.TODO(), cloudRegion, "dummy") //we don't care about instance id in this request
+       err := k8sClient.Init(ctx, cloudRegion, "dummy") //we don't care about instance id in this request
        if err != nil {
                return QueryStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
        }
 
        var resourcesStatus []ResourceStatus
        if name != "" {
-               resList, err := k8sClient.queryResources(context.TODO(), apiVersion, kind, labels, namespace)
+               resList, err := k8sClient.queryResources(ctx, apiVersion, kind, labels, namespace)
                if err != nil {
                        return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources")
                }
@@ -81,7 +81,7 @@ func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labe
                        resourcesStatus = resList
                }
        } else {
-               resList, err := k8sClient.queryResources(context.TODO(), apiVersion, kind, labels, namespace)
+               resList, err := k8sClient.queryResources(ctx, apiVersion, kind, labels, namespace)
                if err != nil {
                        return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources")
                }