From 489d3c012c8994c8a257e492659f035999cd5d9c Mon Sep 17 00:00:00 2001 From: Fiete Ostkamp Date: Tue, 9 Dec 2025 14:43:18 +0100 Subject: [PATCH] Context propagation for all InstanceManager methods - connect context for the rest of the InstanceHandler methods Issue-ID: MULTICLOUD-1538 Change-Id: I5c9a96cc922c3eb795e988bda0cddf30b5d0cbb1 Signed-off-by: Fiete Ostkamp --- src/k8splugin/api/brokerhandler.go | 7 ++-- src/k8splugin/api/instancehandler.go | 12 +++---- src/k8splugin/api/instancehandler_test.go | 10 +++--- src/k8splugin/internal/app/config_backend.go | 17 ++++----- src/k8splugin/internal/app/instance.go | 44 +++++++++++------------ src/k8splugin/internal/app/instance_test.go | 18 +++++----- src/k8splugin/internal/app/subscription.go | 4 +-- src/k8splugin/internal/healthcheck/healthcheck.go | 6 ++-- 8 files changed, 60 insertions(+), 58 deletions(-) diff --git a/src/k8splugin/api/brokerhandler.go b/src/k8splugin/api/brokerhandler.go index e95c1c89..0278f3ac 100644 --- a/src/k8splugin/api/brokerhandler.go +++ b/src/k8splugin/api/brokerhandler.go @@ -14,6 +14,7 @@ limitations under the License. package api import ( + "context" "encoding/json" "io" "net/http" @@ -175,7 +176,7 @@ func (b brokerInstanceHandler) createHandler(w http.ResponseWriter, r *http.Requ log.Info("Instance API Payload", log.Fields{ "payload": instReq, }) - resp, err := b.client.Create(instReq, "") + resp, err := b.client.Create(r.Context(), instReq, "") if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -205,7 +206,7 @@ func (b brokerInstanceHandler) getHandler(w http.ResponseWriter, r *http.Request vars := mux.Vars(r) instanceID := vars["instID"] - resp, err := b.client.Get(instanceID) + resp, err := b.client.Get(context.TODO(), instanceID) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -234,7 +235,7 @@ func (b brokerInstanceHandler) findHandler(w http.ResponseWriter, r *http.Reques vars := mux.Vars(r) //name is an alias for stack-name from the so adapter name := vars["name"] - responses, _ := b.client.Find("", "", "", map[string]string{"stack-name": name}) + responses, _ := b.client.Find(r.Context(), "", "", "", map[string]string{"stack-name": name}) brokerResp := brokerGETResponse{ TemplateType: "heat", diff --git a/src/k8splugin/api/instancehandler.go b/src/k8splugin/api/instancehandler.go index 4e67acf0..b5ef6d39 100644 --- a/src/k8splugin/api/instancehandler.go +++ b/src/k8splugin/api/instancehandler.go @@ -95,7 +95,7 @@ func (i instanceHandler) createHandler(w http.ResponseWriter, r *http.Request) { return } - resp, err := i.client.Create(resource, "") + resp, err := i.client.Create(r.Context(), resource, "") if err != nil { log.Error("Error Creating Resource", log.Fields{ "error": err, @@ -149,7 +149,7 @@ func (i instanceHandler) upgradeHandler(w http.ResponseWriter, r *http.Request) return } - resp, err := i.client.Upgrade(id, resource) + resp, err := i.client.Upgrade(r.Context(), id, resource) if err != nil { log.Error("Error Upgrading Resource", log.Fields{ "error": err, @@ -180,9 +180,9 @@ func (i instanceHandler) getHandler(w http.ResponseWriter, r *http.Request) { var err error if r.URL.Query().Get("full") == "true" { - resp, err = i.client.GetFull(id) + resp, err = i.client.GetFull(r.Context(), id) } else { - resp, err = i.client.Get(id) + resp, err = i.client.Get(r.Context(), id) } if err != nil { @@ -250,7 +250,7 @@ func (i instanceHandler) queryHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, "Missing Kind mandatory parameter", http.StatusBadRequest) return } - resp, err := i.client.Query(id, apiVersion, kind, name, labels) + resp, err := i.client.Query(r.Context(), id, apiVersion, kind, name, labels) if err != nil { log.Error("Error getting Query results", log.Fields{ "error": err, @@ -284,7 +284,7 @@ func (i instanceHandler) listHandler(w http.ResponseWriter, r *http.Request) { rbVersion := r.FormValue("rb-version") profileName := r.FormValue("profile-name") - resp, err := i.client.List(rbName, rbVersion, profileName) + resp, err := i.client.List(r.Context(), rbName, rbVersion, profileName) if err != nil { log.Error("Error listing instances", log.Fields{ "error": err, diff --git a/src/k8splugin/api/instancehandler_test.go b/src/k8splugin/api/instancehandler_test.go index af867e44..f1ef8808 100644 --- a/src/k8splugin/api/instancehandler_test.go +++ b/src/k8splugin/api/instancehandler_test.go @@ -49,7 +49,7 @@ type mockInstanceClient struct { err error } -func (m *mockInstanceClient) Create(inp app.InstanceRequest, newId string) (app.InstanceResponse, error) { +func (m *mockInstanceClient) Create(ctx context.Context, inp app.InstanceRequest, newId string) (app.InstanceResponse, error) { if m.err != nil { return app.InstanceResponse{}, m.err } @@ -57,7 +57,7 @@ func (m *mockInstanceClient) Create(inp app.InstanceRequest, newId string) (app. return m.items[0], nil } -func (m *mockInstanceClient) Get(id string) (app.InstanceResponse, error) { +func (m *mockInstanceClient) Get(ctx context.Context, id string) (app.InstanceResponse, error) { if m.err != nil { return app.InstanceResponse{}, m.err } @@ -65,7 +65,7 @@ func (m *mockInstanceClient) Get(id string) (app.InstanceResponse, error) { return m.items[0], nil } -func (m *mockInstanceClient) Query(id, apiVersion, kind, name, labels string) (app.InstanceStatus, error) { +func (m *mockInstanceClient) Query(ctx context.Context, id, apiVersion, kind, name, labels string) (app.InstanceStatus, error) { if m.err != nil { return app.InstanceStatus{}, m.err } @@ -81,7 +81,7 @@ func (m *mockInstanceClient) Status(ctx context.Context, id string, checkReady b return m.statusItem, nil } -func (m *mockInstanceClient) List(rbname, rbversion, profilename string) ([]app.InstanceMiniResponse, error) { +func (m *mockInstanceClient) List(ctx context.Context, rbname, rbversion, profilename string) ([]app.InstanceMiniResponse, error) { if m.err != nil { return []app.InstanceMiniResponse{}, m.err } @@ -89,7 +89,7 @@ func (m *mockInstanceClient) List(rbname, rbversion, profilename string) ([]app. return m.miniitems, nil } -func (m *mockInstanceClient) Find(rbName string, ver string, profile string, labelKeys map[string]string) ([]app.InstanceMiniResponse, error) { +func (m *mockInstanceClient) Find(ctx context.Context, rbName string, ver string, profile string, labelKeys map[string]string) ([]app.InstanceMiniResponse, error) { if m.err != nil { return nil, m.err } diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go index 80230896..a550b8de 100644 --- a/src/k8splugin/internal/app/config_backend.go +++ b/src/k8splugin/internal/app/config_backend.go @@ -19,6 +19,7 @@ package app import ( "bytes" + "context" "encoding/json" "io/ioutil" "log" @@ -36,7 +37,7 @@ import ( pkgerrors "github.com/pkg/errors" ) -//ConfigStore contains the values that will be stored in the database +// ConfigStore contains the values that will be stored in the database type configVersionDBContent struct { ConfigNew Config `json:"config-new"` ConfigPrev Config `json:"config-prev"` @@ -44,13 +45,13 @@ type configVersionDBContent struct { Resources []KubernetesConfigResource `json:"resources"` } -//ConfigStore to Store the Config +// ConfigStore to Store the Config type ConfigStore struct { instanceID string configName string } -//ConfigVersionStore to Store the Versions of the Config +// ConfigVersionStore to Store the Versions of the Config type ConfigVersionStore struct { instanceID string configName string @@ -234,7 +235,7 @@ func (c ConfigStore) deleteConfig() (Config, error) { return configPrev, nil } -//Cleanup stored data in etcd before instance is being deleted +// Cleanup stored data in etcd before instance is being deleted func (c ConfigVersionStore) cleanupIstanceTags(configName string) error { rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID) @@ -532,7 +533,7 @@ func scheduleResources(c chan configResourceList) { data := <-c //TODO: ADD Check to see if Application running ic := NewInstanceClient() - resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil) + resp, err := ic.Find(context.TODO(), data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil) if (err != nil || len(resp) == 0) && data.action != "STOP" { log.Println("Error finding a running instance. Retrying later...") data.updatedResources <- []KubernetesConfigResource{} @@ -610,8 +611,8 @@ func scheduleResources(c chan configResourceList) { log.Printf("[scheduleResources]: STOP thread") } -//Resolve returns the path where the helm chart merged with -//configuration overrides resides. +// Resolve returns the path where the helm chart merged with +// configuration overrides resides. var resolve = func(rbName, rbVersion, profileName, instanceId string, p Config, releaseName string) (configResourceList, error) { var resTemplates []helm.KubernetesResourceTemplate @@ -659,7 +660,7 @@ var resolve = func(rbName, rbVersion, profileName, instanceId string, p Config, } ic := NewInstanceClient() - instance, err := ic.Get(instanceId) + instance, err := ic.Get(context.TODO(), instanceId) if err != nil { return configResourceList{}, pkgerrors.Wrap(err, "Getting Instance") } diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go index cd06a436..94eb216a 100644 --- a/src/k8splugin/internal/app/instance.go +++ b/src/k8splugin/internal/app/instance.go @@ -117,16 +117,16 @@ type InstanceStatus struct { // InstanceManager is an interface exposes the instantiation functionality type InstanceManager interface { - Create(i InstanceRequest, newId string) (InstanceResponse, error) - Upgrade(id string, u UpgradeRequest) (InstanceResponse, error) - Get(id string) (InstanceResponse, error) - GetFull(id string) (InstanceDbData, error) + Create(ctx context.Context, i InstanceRequest, newId string) (InstanceResponse, error) + Upgrade(ctx context.Context, id string, u UpgradeRequest) (InstanceResponse, error) + Get(ctx context.Context, id string) (InstanceResponse, error) + GetFull(ctx context.Context, id string) (InstanceDbData, error) Status(ctx context.Context, id string, checkReady bool) (InstanceStatus, error) - Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error) - List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) - Find(rbName string, ver string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error) + Query(ctx context.Context, id, apiVersion, kind, name, labels string) (InstanceStatus, error) + List(ctx context.Context, rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) + Find(ctx context.Context, rbName string, ver string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error) Delete(ctx context.Context, id string) error - RecoverCreateOrDelete(id string) error + RecoverCreateOrDelete(ctx context.Context, id string) error } // InstanceKey is used as the primary key in the db @@ -167,7 +167,7 @@ func NewInstanceClient() *InstanceClient { // Simplified function to retrieve model data from instance ID func resolveModelFromInstance(instanceID string) (rbName, rbVersion, profileName, releaseName string, err error) { v := NewInstanceClient() - resp, err := v.Get(instanceID) + resp, err := v.Get(context.TODO(), instanceID) if err != nil { return "", "", "", "", pkgerrors.Wrap(err, "Getting instance") } @@ -250,7 +250,7 @@ func getOverridesAndHookInfo(i InstanceRequest) ([]string, HookTimeoutInfo, erro } // Create an instance of rb on the cluster in the database -func (v *InstanceClient) Create(i InstanceRequest, newId string) (InstanceResponse, error) { +func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId string) (InstanceResponse, error) { // Name is required if i.RBName == "" || i.RBVersion == "" || i.ProfileName == "" || i.CloudRegion == "" { return InstanceResponse{}, @@ -454,7 +454,7 @@ func upgradeRequestToInstanceRequest(instance InstanceResponse, u UpgradeRequest } // Upgrade an instance of rb on the cluster in the database -func (v *InstanceClient) Upgrade(id string, u UpgradeRequest) (InstanceResponse, error) { +func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeRequest) (InstanceResponse, error) { key := InstanceKey{ ID: id, } @@ -489,11 +489,11 @@ func (v *InstanceClient) Upgrade(id string, u UpgradeRequest) (InstanceResponse, } if currentInstance.Request.CloudRegion != u.CloudRegion { - newInstance, err := v.Create(i, "") + newInstance, err := v.Create(ctx, i, "") if err == nil { err = v.Delete(context.TODO(), id) if err == nil { - newInstanceDb, err := v.GetFull(newInstance.ID) + newInstanceDb, err := v.GetFull(ctx, newInstance.ID) oldKey := InstanceKey{ ID: newInstance.ID, } @@ -702,7 +702,7 @@ func (v *InstanceClient) Upgrade(id string, u UpgradeRequest) (InstanceResponse, } // Get returns the full instance for corresponding ID -func (v *InstanceClient) GetFull(id string) (InstanceDbData, error) { +func (v *InstanceClient) GetFull(ctx context.Context, id string) (InstanceDbData, error) { key := InstanceKey{ ID: id, } @@ -748,7 +748,7 @@ func (v *InstanceClient) GetFull(id string) (InstanceDbData, error) { } // Get returns the instance for corresponding ID -func (v *InstanceClient) Get(id string) (InstanceResponse, error) { +func (v *InstanceClient) Get(ctx context.Context, id string) (InstanceResponse, error) { key := InstanceKey{ ID: id, } @@ -771,7 +771,7 @@ func (v *InstanceClient) Get(id string) (InstanceResponse, error) { } // Query returns state of instance's filtered resources -func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error) { +func (v *InstanceClient) Query(ctx context.Context, id, apiVersion, kind, name, labels string) (InstanceStatus, error) { queryClient := NewQueryClient() //Read the status from the DB @@ -983,7 +983,7 @@ func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient K // List returns the instance for corresponding ID // Empty string returns all -func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) { +func (v *InstanceClient) List(ctx context.Context, rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) { dbres, err := db.DBconn.ReadAll(context.TODO(), v.storeName, v.tagInst) if err != nil || len(dbres) == 0 { @@ -1039,12 +1039,12 @@ func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]Instance // If profile is empty, it will return all instances for a given rbName+version // If labelKeys are provided, the results are filtered based on that. // It is an AND operation for labelkeys. -func (v *InstanceClient) Find(rbName string, version string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error) { +func (v *InstanceClient) Find(ctx context.Context, rbName string, version string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error) { if rbName == "" && len(labelKeys) == 0 { return []InstanceMiniResponse{}, pkgerrors.New("rbName or labelkeys is required and cannot be empty") } - responses, err := v.List(rbName, version, profile) + responses, err := v.List(ctx, rbName, version, profile) if err != nil { return []InstanceMiniResponse{}, pkgerrors.Wrap(err, "Listing Instances") } @@ -1072,7 +1072,7 @@ func (v *InstanceClient) Find(rbName string, version string, profile string, lab // Delete the Instance from database func (v *InstanceClient) Delete(ctx context.Context, id string) error { - inst, err := v.GetFull(id) + inst, err := v.GetFull(ctx, id) if err != nil { return pkgerrors.Wrap(err, "Error getting Instance") } @@ -1159,8 +1159,8 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error { } // Continue the instantiation -func (v *InstanceClient) RecoverCreateOrDelete(id string) error { - instance, err := v.GetFull(id) +func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) error { + instance, err := v.GetFull(ctx, id) if err != nil { return pkgerrors.Wrap(err, "Error getting instance "+id+", skip this instance. Error detail") } diff --git a/src/k8splugin/internal/app/instance_test.go b/src/k8splugin/internal/app/instance_test.go index 611c31e9..70d858f3 100644 --- a/src/k8splugin/internal/app/instance_test.go +++ b/src/k8splugin/internal/app/instance_test.go @@ -174,7 +174,7 @@ func TestInstanceCreate(t *testing.T) { CloudRegion: "mock_connection", } - ir, err := ic.Create(input, "") + ir, err := ic.Create(context.TODO(), input, "") if err != nil { t.Fatalf("TestInstanceCreate returned an error (%s)", err) } @@ -265,7 +265,7 @@ func TestInstanceGet(t *testing.T) { } ic := NewInstanceClient() id := "HaKpys8e" - data, err := ic.Get(id) + data, err := ic.Get(context.TODO(), id) if err != nil { t.Fatalf("TestInstanceGet returned an error (%s)", err) } @@ -314,7 +314,7 @@ func TestInstanceGet(t *testing.T) { ic := NewInstanceClient() id := "non-existing" - _, err := ic.Get(id) + _, err := ic.Get(context.TODO(), id) if err == nil { t.Fatal("Expected error, got pass", err) } @@ -474,7 +474,7 @@ func TestInstanceFind(t *testing.T) { } ic := NewInstanceClient() name := "test-rbdef" - data, err := ic.Find(name, "", "", nil) + data, err := ic.Find(context.TODO(), name, "", "", nil) if err != nil { t.Fatalf("TestInstanceFind returned an error (%s)", err) } @@ -521,7 +521,7 @@ func TestInstanceFind(t *testing.T) { labels := map[string]string{ "vf_module_id": "test-vf-module-id", } - data, err := ic.Find(name, "", "", labels) + data, err := ic.Find(context.TODO(), name, "", "", labels) if err != nil { t.Fatalf("TestInstanceFind returned an error (%s)", err) } @@ -575,7 +575,7 @@ func TestInstanceFind(t *testing.T) { } ic := NewInstanceClient() name := "test-rbdef" - data, err := ic.Find(name, "v1", "", nil) + data, err := ic.Find(context.TODO(), name, "v1", "", nil) if err != nil { t.Fatalf("TestInstanceFind returned an error (%s)", err) } @@ -619,7 +619,7 @@ func TestInstanceFind(t *testing.T) { } ic := NewInstanceClient() name := "test-rbdef" - data, err := ic.Find(name, "v1", "profile1", nil) + data, err := ic.Find(context.TODO(), name, "v1", "profile1", nil) if err != nil { t.Fatalf("TestInstanceFind returned an error (%s)", err) } @@ -678,7 +678,7 @@ func TestInstanceFind(t *testing.T) { ic := NewInstanceClient() name := "non-existing" - resp, _ := ic.Find(name, "", "", nil) + resp, _ := ic.Find(context.TODO(), name, "", "", nil) if len(resp) != 0 { t.Fatalf("Expected 0 responses, but got %d", len(resp)) } @@ -881,7 +881,7 @@ func TestInstanceWithHookCreate(t *testing.T) { CloudRegion: "mock_connection", } - ir, err := ic.Create(input, "") + ir, err := ic.Create(context.TODO(), input, "") if err != nil { t.Fatalf("TestInstanceWithHookCreate returned an error (%s)", err) } diff --git a/src/k8splugin/internal/app/subscription.go b/src/k8splugin/internal/app/subscription.go index e353ca82..aa66033f 100644 --- a/src/k8splugin/internal/app/subscription.go +++ b/src/k8splugin/internal/app/subscription.go @@ -355,7 +355,7 @@ func (iss *InstanceStatusSubClient) RestoreWatchers() { time.Sleep(time.Second * 10) log.Info("Restoring status subscription notifications", log.Fields{}) v := NewInstanceClient() - instances, err := v.List("", "", "") + instances, err := v.List(context.TODO(), "", "", "") if err != nil { log.Error("Error reading instance list", log.Fields{ "error": err.Error(), @@ -394,7 +394,7 @@ func (iss *InstanceStatusSubClient) refreshWatchers(instanceId, subId string) er }) v := NewInstanceClient() k8sClient := KubernetesClient{} - instance, err := v.Get(instanceId) + instance, err := v.Get(context.TODO(), instanceId) if err != nil { return pkgerrors.Wrap(err, "Cannot get instance for notify thread") } diff --git a/src/k8splugin/internal/healthcheck/healthcheck.go b/src/k8splugin/internal/healthcheck/healthcheck.go index 328bdcab..9bc1a877 100644 --- a/src/k8splugin/internal/healthcheck/healthcheck.go +++ b/src/k8splugin/internal/healthcheck/healthcheck.go @@ -109,7 +109,7 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err //Determine Cloud Region and namespace v := app.NewInstanceClient() - instance, err := v.Get(instanceId) + instance, err := v.Get(context.TODO(), instanceId) if err != nil { return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Getting instance") } @@ -286,7 +286,7 @@ func (ihc InstanceHCClient) Get(instanceId, healthcheckId string) (InstanceHCSta func (ihc InstanceHCClient) Delete(instanceId, healthcheckId string) error { key := HealthcheckKey{instanceId, healthcheckId} v := app.NewInstanceClient() - instance, err := v.Get(instanceId) + instance, err := v.Get(context.TODO(), instanceId) if err != nil { return pkgerrors.Wrap(err, "Getting instance") } @@ -324,7 +324,7 @@ func (ihc InstanceHCClient) List(instanceId string) (InstanceHCOverview, error) // Retrieve info about available hooks v := app.NewInstanceClient() - instance, err := v.Get(instanceId) + instance, err := v.Get(context.TODO(), instanceId) if err != nil { return ihco, pkgerrors.Wrap(err, "Getting Instance data") } -- 2.16.6