From 62f11b0eef95cf7ec017d735a34f69b53e6e9c70 Mon Sep 17 00:00:00 2001 From: Fiete Ostkamp Date: Wed, 10 Dec 2025 15:46:10 +0100 Subject: [PATCH] Add spans for KubernetesClient methods - this is done to better understand where time is spend when executing requests Issue-ID: MULTICLOUD-1538 Change-Id: I7f069b3a188deb685ec595273888f5ad2f6e1054 Signed-off-by: Fiete Ostkamp --- src/k8splugin/internal/app/client.go | 70 +++++++++----- src/k8splugin/internal/app/client_test.go | 7 +- src/k8splugin/internal/app/config_backend.go | 13 +-- src/k8splugin/internal/app/hook.go | 7 +- src/k8splugin/internal/app/hook_test.go | 112 +++++++++++----------- src/k8splugin/internal/app/instance.go | 69 +++++++------ src/k8splugin/internal/app/query.go | 8 +- src/k8splugin/internal/app/subscription.go | 5 +- src/k8splugin/internal/healthcheck/healthcheck.go | 17 ++-- src/k8splugin/internal/healthcheck/hooks.go | 3 +- 10 files changed, 177 insertions(+), 134 deletions(-) diff --git a/src/k8splugin/internal/app/client.go b/src/k8splugin/internal/app/client.go index b85fe564..b8f56915 100644 --- a/src/k8splugin/internal/app/client.go +++ b/src/k8splugin/internal/app/client.go @@ -242,12 +242,12 @@ func (k *KubernetesClient) WatchHookUntilReady(timeout time.Duration, ns string, } // getPodsByLabel yields status of all pods under given instance ID -func (k *KubernetesClient) getPodsByLabel(namespace string) ([]ResourceStatus, error) { +func (k *KubernetesClient) getPodsByLabel(ctx context.Context, namespace string) ([]ResourceStatus, error) { client := k.GetStandardClient().CoreV1().Pods(namespace) listOpts := metav1.ListOptions{ LabelSelector: config.GetConfiguration().KubernetesLabelName + "=" + k.instanceID, } - podList, err := client.List(context.TODO(), listOpts) + podList, err := client.List(ctx, listOpts) if err != nil { return nil, pkgerrors.Wrap(err, "Retrieving PodList from cluster") } @@ -275,7 +275,10 @@ func (k *KubernetesClient) getPodsByLabel(namespace string) ([]ResourceStatus, e return resp, nil } -func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, namespace string) ([]ResourceStatus, error) { +func (k *KubernetesClient) queryResources(ctx context.Context, apiVersion, kind, labelSelector, namespace string) ([]ResourceStatus, error) { + ctx, span := tracer.Start(ctx, "KubernetesClient.queryResources") + defer span.End() + dynClient := k.GetDynamicClient() mapper := k.GetMapper() gvk := schema.FromAPIVersionAndKind(apiVersion, kind) @@ -289,12 +292,12 @@ func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, names LabelSelector: labelSelector, } var unstrList *unstructured.UnstructuredList - dynClient.Resource(gvr).Namespace(namespace).List(context.TODO(), opts) + dynClient.Resource(gvr).Namespace(namespace).List(ctx, opts) switch mapping.Scope.Name() { case meta.RESTScopeNameNamespace: - unstrList, err = dynClient.Resource(gvr).Namespace(namespace).List(context.TODO(), opts) + unstrList, err = dynClient.Resource(gvr).Namespace(namespace).List(ctx, opts) case meta.RESTScopeNameRoot: - unstrList, err = dynClient.Resource(gvr).List(context.TODO(), opts) + unstrList, err = dynClient.Resource(gvr).List(ctx, opts) default: return nil, pkgerrors.New("Got an unknown RESTScopeName for mapping: " + gvk.String()) } @@ -312,7 +315,9 @@ func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, names } // GetResourcesStatus yields status of given generic resource -func (k *KubernetesClient) GetResourceStatus(res helm.KubernetesResource, namespace string) (ResourceStatus, error) { +func (k *KubernetesClient) GetResourceStatus(ctx context.Context, res helm.KubernetesResource, namespace string) (ResourceStatus, error) { + ctx, span := tracer.Start(ctx, "KubernetesClient.GetResourceStatus") + defer span.End() dynClient := k.GetDynamicClient() mapper := k.GetMapper() mapping, err := mapper.RESTMapping(schema.GroupKind{ @@ -329,9 +334,9 @@ func (k *KubernetesClient) GetResourceStatus(res helm.KubernetesResource, namesp var unstruct *unstructured.Unstructured switch mapping.Scope.Name() { case meta.RESTScopeNameNamespace: - unstruct, err = dynClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), res.Name, opts) + unstruct, err = dynClient.Resource(gvr).Namespace(namespace).Get(ctx, res.Name, opts) case meta.RESTScopeNameRoot: - unstruct, err = dynClient.Resource(gvr).Get(context.TODO(), res.Name, opts) + unstruct, err = dynClient.Resource(gvr).Get(ctx, res.Name, opts) default: return ResourceStatus{}, pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + res.GVK.String()) } @@ -345,10 +350,12 @@ func (k *KubernetesClient) GetResourceStatus(res helm.KubernetesResource, namesp // getKubeConfig uses the connectivity client to get the kubeconfig based on the name // of the cloudregion. This is written out to a file. -func (k *KubernetesClient) getKubeConfig(cloudregion string) (string, error) { +func (k *KubernetesClient) getKubeConfig(ctx context.Context, cloudregion string) (string, error) { + ctx, span := tracer.Start(ctx, "KubernetesClient.getKubeConfig") + defer span.End() conn := connection.NewConnectionClient() - kubeConfigPath, err := conn.Download(context.TODO(), cloudregion) + kubeConfigPath, err := conn.Download(ctx, cloudregion) if err != nil { return "", pkgerrors.Wrap(err, "Downloading kubeconfig") } @@ -356,8 +363,11 @@ func (k *KubernetesClient) getKubeConfig(cloudregion string) (string, error) { return kubeConfigPath, nil } -// Init loads the Kubernetes configuation values stored into the local configuration file -func (k *KubernetesClient) Init(cloudregion string, iid string) error { +// Init loads the Kubernetes configuration values stored into the local configuration file +func (k *KubernetesClient) Init(ctx context.Context, cloudregion string, iid string) error { + ctx, span := tracer.Start(ctx, "KubernetesClient.Init") + defer span.End() + if cloudregion == "" { return pkgerrors.New("Cloudregion is empty") } @@ -368,7 +378,7 @@ func (k *KubernetesClient) Init(cloudregion string, iid string) error { k.instanceID = iid - configPath, err := k.getKubeConfig(cloudregion) + configPath, err := k.getKubeConfig(ctx, cloudregion) if err != nil { return pkgerrors.Wrap(err, "Get kubeconfig file") } @@ -416,7 +426,9 @@ func (k *KubernetesClient) Init(cloudregion string, iid string) error { return nil } -func (k *KubernetesClient) ensureNamespace(namespace string) error { +func (k *KubernetesClient) ensureNamespace(ctx context.Context, namespace string) error { + _, span := tracer.Start(ctx, "KubernetesClient.ensureNamespace") + defer span.End() pluginImpl, err := plugin.GetPluginByKind("Namespace") if err != nil { @@ -459,7 +471,9 @@ func (k *KubernetesClient) ensureNamespace(namespace string) error { return nil } -func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate, namespace string) (helm.KubernetesResource, error) { +func (k *KubernetesClient) CreateKind(ctx context.Context, resTempl helm.KubernetesResourceTemplate, namespace string) (helm.KubernetesResource, error) { + _, span := tracer.Start(ctx, "KubernetesClient.CreateKind") + defer span.End() if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) { return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists") @@ -495,8 +509,10 @@ func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate, }, nil } -func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate, +func (k *KubernetesClient) updateKind(ctx context.Context, resTempl helm.KubernetesResourceTemplate, namespace string, createIfDoNotExist bool) (helm.KubernetesResource, error) { + _, span := tracer.Start(ctx, "KubernetesClient.updateKind") + defer span.End() if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) { return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + " does not exists") @@ -541,18 +557,20 @@ func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate, }, nil } -func (k *KubernetesClient) createResources(sortedTemplates []helm.KubernetesResourceTemplate, +func (k *KubernetesClient) createResources(ctx context.Context, sortedTemplates []helm.KubernetesResourceTemplate, namespace string) ([]helm.KubernetesResource, error) { + ctx, span := tracer.Start(ctx, "KubernetesClient.createResources") + defer span.End() var createdResources []helm.KubernetesResource - err := k.ensureNamespace(namespace) + err := k.ensureNamespace(ctx, namespace) if err != nil { return createdResources, pkgerrors.Wrap(err, "Creating Namespace") } for _, resTempl := range sortedTemplates { - resCreated, err := k.CreateKind(resTempl, namespace) + resCreated, err := k.CreateKind(ctx, resTempl, namespace) if err != nil { return createdResources, pkgerrors.Wrapf(err, "Error creating kind: %+v", resTempl.GVK) } @@ -562,17 +580,19 @@ func (k *KubernetesClient) createResources(sortedTemplates []helm.KubernetesReso return createdResources, nil } -func (k *KubernetesClient) updateResources(sortedTemplates []helm.KubernetesResourceTemplate, +func (k *KubernetesClient) updateResources(ctx context.Context, sortedTemplates []helm.KubernetesResourceTemplate, namespace string, createIfDoNotExist bool) ([]helm.KubernetesResource, error) { + ctx, span := tracer.Start(ctx, "KubernetesClient.updateResources") + defer span.End() - err := k.ensureNamespace(namespace) + err := k.ensureNamespace(ctx, namespace) if err != nil { return nil, pkgerrors.Wrap(err, "Creating Namespace") } var updatedResources []helm.KubernetesResource for _, resTempl := range sortedTemplates { - resUpdated, err := k.updateKind(resTempl, namespace, createIfDoNotExist) + resUpdated, err := k.updateKind(ctx, resTempl, namespace, createIfDoNotExist) if err != nil { return nil, pkgerrors.Wrapf(err, "Error updating kind: %+v", resTempl.GVK) } @@ -609,7 +629,9 @@ func (k *KubernetesClient) DeleteKind(resource helm.KubernetesResource, namespac return nil } -func (k *KubernetesClient) deleteResources(resources []helm.KubernetesResource, namespace string) error { +func (k *KubernetesClient) deleteResources(ctx context.Context, resources []helm.KubernetesResource, namespace string) error { + _, deleteSpan := tracer.Start(ctx, "KubernetesClient.deleteResources") + defer deleteSpan.End() //TODO: Investigate if deletion should be in a particular order for _, res := range resources { err := k.DeleteKind(res, namespace) diff --git a/src/k8splugin/internal/app/client_test.go b/src/k8splugin/internal/app/client_test.go index f51c15fc..5d5a2ab9 100644 --- a/src/k8splugin/internal/app/client_test.go +++ b/src/k8splugin/internal/app/client_test.go @@ -14,6 +14,7 @@ limitations under the License. package app import ( + "context" "encoding/base64" "io/ioutil" "os" @@ -73,7 +74,7 @@ func TestInit(t *testing.T) { kubeClient := KubernetesClient{} // Refer to the connection via its name - err = kubeClient.Init("mock_connection", "abcdefg") + err = kubeClient.Init(context.TODO(), "mock_connection", "abcdefg") if err != nil { t.Fatalf("TestGetKubeClient returned an error (%s)", err) } @@ -120,7 +121,7 @@ func TestCreateResources(t *testing.T) { }, } - _, err := k8.createResources(data, "testnamespace") + _, err := k8.createResources(context.TODO(), data, "testnamespace") if err != nil { t.Fatalf("TestCreateResources returned an error (%s)", err) } @@ -175,7 +176,7 @@ func TestDeleteResources(t *testing.T) { }, } - err := k8.deleteResources(data, "test") + err := k8.deleteResources(context.TODO(), data, "test") if err != nil { t.Fatalf("TestCreateVNF returned an error (%s)", err) } diff --git a/src/k8splugin/internal/app/config_backend.go b/src/k8splugin/internal/app/config_backend.go index 046ef810..454684b6 100644 --- a/src/k8splugin/internal/app/config_backend.go +++ b/src/k8splugin/internal/app/config_backend.go @@ -530,10 +530,11 @@ func scheduleResources(c chan configResourceList) { // Keep thread running log.Printf("[scheduleResources]: START thread") for { + ctx := context.TODO() data := <-c //TODO: ADD Check to see if Application running ic := NewInstanceClient() - resp, err := ic.Find(context.TODO(), data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil) + resp, err := ic.Find(ctx, data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil) if (err != nil || len(resp) == 0) && data.action != "STOP" { log.Println("Error finding a running instance. Retrying later...") data.updatedResources <- []KubernetesConfigResource{} @@ -546,7 +547,7 @@ func scheduleResources(c chan configResourceList) { resources := []KubernetesConfigResource{} for _, inst := range resp { k8sClient := KubernetesClient{} - err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) + err = k8sClient.Init(ctx, inst.Request.CloudRegion, inst.ID) if err != nil { log.Printf("Getting CloudRegion Information: %s", err.Error()) //Move onto the next cloud region @@ -554,12 +555,12 @@ func scheduleResources(c chan configResourceList) { } for _, res := range data.resourceTemplates { var resToCreateOrUpdate = []helm.KubernetesResourceTemplate{res} - resProceeded, err := k8sClient.createResources(resToCreateOrUpdate, inst.Namespace) + resProceeded, err := k8sClient.createResources(ctx, resToCreateOrUpdate, inst.Namespace) errCreate := err var status string = "" if err != nil { // assuming - the err represent the resource already exist, so going for update - resProceeded, err = k8sClient.updateResources(resToCreateOrUpdate, inst.Namespace, false) + resProceeded, err = k8sClient.updateResources(ctx, resToCreateOrUpdate, inst.Namespace, false) if err != nil { log.Printf("Error Creating resources: %s", errCreate.Error()) log.Printf("Error Updating resources: %s", err.Error()) @@ -583,7 +584,7 @@ func scheduleResources(c chan configResourceList) { log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resources) for _, inst := range resp { k8sClient := KubernetesClient{} - err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) + err = k8sClient.Init(ctx, inst.Request.CloudRegion, inst.ID) if err != nil { log.Printf("Getting CloudRegion Information: %s", err.Error()) //Move onto the next cloud region @@ -593,7 +594,7 @@ func scheduleResources(c chan configResourceList) { for _, res := range data.resources { tmpResources = append(tmpResources, res.Resource) } - err = k8sClient.deleteResources(helm.GetReverseK8sResources(tmpResources), inst.Namespace) + err = k8sClient.deleteResources(context.TODO(), helm.GetReverseK8sResources(tmpResources), inst.Namespace) if err != nil { log.Printf("Error Deleting resources: %s", err.Error()) continue diff --git a/src/k8splugin/internal/app/hook.go b/src/k8splugin/internal/app/hook.go index 9645adcd..e4a4e9d5 100644 --- a/src/k8splugin/internal/app/hook.go +++ b/src/k8splugin/internal/app/hook.go @@ -77,6 +77,7 @@ func (hc *HookClient) ExecHook( timeout int64, startIndex int, dbData *InstanceDbData) error { + ctx := context.TODO() executingHooks := hc.getHookByEvent(hs, hook) key := InstanceKey{ ID: hc.id, @@ -109,7 +110,7 @@ func (hc *HookClient) ExecHook( GVK: h.KRT.GVK, FilePath: h.KRT.FilePath, } - createdHook, err := k8sClient.CreateKind(resTempl, hc.kubeNameSpace) + createdHook, err := k8sClient.CreateKind(ctx, resTempl, hc.kubeNameSpace) if err != nil { log.Printf(" Instance: %s, Warning: %s hook %s, filePath: %s, error: %s", hc.id, hook, h.Hook.Name, h.KRT.FilePath, err) hc.deleteHookByPolicy(h, release.HookFailed, k8sClient) @@ -151,7 +152,7 @@ func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDelete } if hookHasDeletePolicy(h, policy) { log.Printf(" Instance: %s, Deleting hook %s due to %q policy", hc.id, h.Hook.Name, policy) - if errHookDelete := k8sClient.deleteResources(append([]helm.KubernetesResource{}, rss), hc.kubeNameSpace); errHookDelete != nil { + if errHookDelete := k8sClient.deleteResources(context.TODO(), append([]helm.KubernetesResource{}, rss), hc.kubeNameSpace); errHookDelete != nil { if strings.Contains(errHookDelete.Error(), "not found") { return nil } else { @@ -163,7 +164,7 @@ func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDelete isDeleted := false for !isDeleted { log.Printf(" Instance: %s, Waiting on deleting hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy) - if _, err := k8sClient.GetResourceStatus(rss, hc.kubeNameSpace); err != nil { + if _, err := k8sClient.GetResourceStatus(context.TODO(), rss, hc.kubeNameSpace); err != nil { if strings.Contains(err.Error(), "not found") { log.Printf(" Instance: %s, Deleted hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy) return nil diff --git a/src/k8splugin/internal/app/hook_test.go b/src/k8splugin/internal/app/hook_test.go index 9c63194e..79b6bcc2 100644 --- a/src/k8splugin/internal/app/hook_test.go +++ b/src/k8splugin/internal/app/hook_test.go @@ -14,36 +14,38 @@ limitations under the License. package app import ( + "context" "encoding/base64" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" + "io/ioutil" + "testing" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "helm.sh/helm/v3/pkg/release" "helm.sh/helm/v3/pkg/time" - "io/ioutil" "k8s.io/apimachinery/pkg/runtime/schema" - "testing" ) func generateHookList() []*helm.Hook { var hookList []*helm.Hook preInstallHook1 := helm.Hook{ Hook: release.Hook{ - Name : "preinstall1", - Kind : "Job", - Path : "", - Manifest : "", - Events : []release.HookEvent{release.HookPreInstall}, - LastRun : release.HookExecution{ + Name: "preinstall1", + Kind: "Job", + Path: "", + Manifest: "", + Events: []release.HookEvent{release.HookPreInstall}, + LastRun: release.HookExecution{ StartedAt: time.Now(), CompletedAt: time.Now(), Phase: "", }, - Weight : -5, - DeletePolicies : []release.HookDeletePolicy{}, + Weight: -5, + DeletePolicies: []release.HookDeletePolicy{}, }, - KRT: helm.KubernetesResourceTemplate{ + KRT: helm.KubernetesResourceTemplate{ GVK: schema.GroupVersionKind{ Group: "batch", Version: "v1", @@ -54,20 +56,20 @@ func generateHookList() []*helm.Hook { } preInstallHook2 := helm.Hook{ Hook: release.Hook{ - Name : "preinstall2", - Kind : "Deployment", - Path : "", - Manifest : "", - Events : []release.HookEvent{release.HookPreInstall}, - LastRun : release.HookExecution{ + Name: "preinstall2", + Kind: "Deployment", + Path: "", + Manifest: "", + Events: []release.HookEvent{release.HookPreInstall}, + LastRun: release.HookExecution{ StartedAt: time.Now(), CompletedAt: time.Now(), Phase: "", }, - Weight : 0, - DeletePolicies : []release.HookDeletePolicy{}, + Weight: 0, + DeletePolicies: []release.HookDeletePolicy{}, }, - KRT: helm.KubernetesResourceTemplate{ + KRT: helm.KubernetesResourceTemplate{ GVK: schema.GroupVersionKind{ Group: "batch", Version: "v1", @@ -78,20 +80,20 @@ func generateHookList() []*helm.Hook { } postInstallHook := helm.Hook{ Hook: release.Hook{ - Name : "postinstall", - Kind : "Job", - Path : "", - Manifest : "", - Events : []release.HookEvent{release.HookPostInstall}, - LastRun : release.HookExecution{ + Name: "postinstall", + Kind: "Job", + Path: "", + Manifest: "", + Events: []release.HookEvent{release.HookPostInstall}, + LastRun: release.HookExecution{ StartedAt: time.Now(), CompletedAt: time.Now(), Phase: "", }, - Weight : -5, - DeletePolicies : []release.HookDeletePolicy{}, + Weight: -5, + DeletePolicies: []release.HookDeletePolicy{}, }, - KRT: helm.KubernetesResourceTemplate{ + KRT: helm.KubernetesResourceTemplate{ GVK: schema.GroupVersionKind{ Group: "batch", Version: "v1", @@ -102,20 +104,20 @@ func generateHookList() []*helm.Hook { } preDeleteHook := helm.Hook{ Hook: release.Hook{ - Name : "predelete", - Kind : "Job", - Path : "", - Manifest : "", - Events : []release.HookEvent{release.HookPreDelete}, - LastRun : release.HookExecution{ + Name: "predelete", + Kind: "Job", + Path: "", + Manifest: "", + Events: []release.HookEvent{release.HookPreDelete}, + LastRun: release.HookExecution{ StartedAt: time.Now(), CompletedAt: time.Now(), Phase: "", }, - Weight : -5, - DeletePolicies : []release.HookDeletePolicy{}, + Weight: -5, + DeletePolicies: []release.HookDeletePolicy{}, }, - KRT: helm.KubernetesResourceTemplate{ + KRT: helm.KubernetesResourceTemplate{ GVK: schema.GroupVersionKind{ Group: "batch", Version: "v1", @@ -126,20 +128,20 @@ func generateHookList() []*helm.Hook { } postDeleteHook := helm.Hook{ Hook: release.Hook{ - Name : "postdelete", - Kind : "Job", - Path : "", - Manifest : "", - Events : []release.HookEvent{release.HookPostDelete}, - LastRun : release.HookExecution{ + Name: "postdelete", + Kind: "Job", + Path: "", + Manifest: "", + Events: []release.HookEvent{release.HookPostDelete}, + LastRun: release.HookExecution{ StartedAt: time.Now(), CompletedAt: time.Now(), Phase: "", }, - Weight : -5, - DeletePolicies : []release.HookDeletePolicy{}, + Weight: -5, + DeletePolicies: []release.HookDeletePolicy{}, }, - KRT: helm.KubernetesResourceTemplate{ + KRT: helm.KubernetesResourceTemplate{ GVK: schema.GroupVersionKind{ Group: "batch", Version: "v1", @@ -241,24 +243,24 @@ func TestExecHook(t *testing.T) { } k8sClient := KubernetesClient{} - err = k8sClient.Init("mock_connection", "test") + err = k8sClient.Init(context.TODO(), "mock_connection", "test") if err != nil { t.Fatal(err.Error()) } - err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall,10,0, nil) + err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, 10, 0, nil) if err != nil { t.Fatal(err.Error()) } - err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall,10,0, nil) + err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall, 10, 0, nil) if err != nil { t.Fatal(err.Error()) } - err = hookClient.ExecHook(k8sClient, hookList, release.HookPreDelete,10,0, nil) + err = hookClient.ExecHook(k8sClient, hookList, release.HookPreDelete, 10, 0, nil) if err != nil { t.Fatal(err.Error()) } - err = hookClient.ExecHook(k8sClient, hookList, release.HookPostDelete,10,0, nil) + err = hookClient.ExecHook(k8sClient, hookList, release.HookPostDelete, 10, 0, nil) if err != nil { t.Fatal(err.Error()) } -} \ No newline at end of file +} diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go index 94eb216a..c6a4c59b 100644 --- a/src/k8splugin/internal/app/instance.go +++ b/src/k8splugin/internal/app/instance.go @@ -28,6 +28,9 @@ import ( "strings" "time" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.7.0" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" @@ -134,6 +137,8 @@ type InstanceKey struct { ID string `json:"id"` } +var tracer = otel.Tracer("internal/app") + // We will use json marshalling to convert to string to // preserve the underlying structure. func (dk InstanceKey) String() string { @@ -284,7 +289,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st } k8sClient := KubernetesClient{} - err = k8sClient.Init(i.CloudRegion, finalId) + err = k8sClient.Init(ctx, i.CloudRegion, finalId) if err != nil { namegenerator.Release(generatedId) return InstanceResponse{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") @@ -326,7 +331,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st PostUpgradeTimeout: hookTimeoutInfo.postUpgradeTimeout, } - err = k8sClient.ensureNamespace(profile.Namespace) + err = k8sClient.ensureNamespace(ctx, profile.Namespace) if err != nil { namegenerator.Release(generatedId) return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace") @@ -335,7 +340,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st key := InstanceKey{ ID: finalId, } - err = db.DBconn.Create(context.TODO(), v.storeName, key, v.tagInst, dbData) + err = db.DBconn.Create(ctx, v.storeName, key, v.tagInst, dbData) if err != nil { namegenerator.Release(generatedId) return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry") @@ -343,7 +348,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st if len(crdList) > 0 { log.Printf("Pre-Installing CRDs") - _, err = k8sClient.createResources(crdList, profile.Namespace) + _, err = k8sClient.createResources(ctx, crdList, profile.Namespace) if err != nil { return InstanceResponse{}, pkgerrors.Wrap(err, "Pre-Installing CRDs") @@ -355,7 +360,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, hookTimeoutInfo.preInstallTimeOut, 0, &dbData) if err != nil { log.Printf("Error running preinstall hooks for release %s, Error: %s. Stop here", releaseName, err) - err2 := db.DBconn.Delete(context.TODO(), v.storeName, key, v.tagInst) + err2 := db.DBconn.Delete(ctx, v.storeName, key, v.tagInst) if err2 != nil { log.Printf("Error cleaning failed instance in DB, please check DB.") } else { @@ -366,9 +371,9 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st } dbData.Status = "CREATING" - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData) if err != nil { - err2 := db.DBconn.Delete(context.TODO(), v.storeName, key, v.tagInst) + err2 := db.DBconn.Delete(ctx, v.storeName, key, v.tagInst) if err2 != nil { log.Printf("Delete Instance DB Entry for release %s has error.", releaseName) } else { @@ -378,15 +383,15 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st } //Main rss creation is supposed to be very quick -> no need to support recover for main rss - createdResources, err := k8sClient.createResources(sortedTemplates, profile.Namespace) + createdResources, err := k8sClient.createResources(ctx, sortedTemplates, profile.Namespace) if err != nil { if len(createdResources) > 0 { log.Printf("[Instance] Reverting created resources on Error: %s", err.Error()) - k8sClient.deleteResources(helm.GetReverseK8sResources(createdResources), profile.Namespace) + k8sClient.deleteResources(ctx, helm.GetReverseK8sResources(createdResources), profile.Namespace) } log.Printf(" Instance: %s, Main rss are failed, skip post-install and remove instance in DB", finalId) //main rss creation failed -> remove instance in DB - err2 := db.DBconn.Delete(context.TODO(), v.storeName, key, v.tagInst) + err2 := db.DBconn.Delete(ctx, v.storeName, key, v.tagInst) if err2 != nil { log.Printf("Delete Instance DB Entry for release %s has error.", releaseName) } else { @@ -397,7 +402,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st dbData.Status = "CREATED" dbData.Resources = createdResources - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData) if err != nil { return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry") } @@ -423,14 +428,14 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st } else { dbData.Status = "DONE" } - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData) if err != nil { log.Printf("Update Instance DB Entry for release %s has error.", releaseName) } }() } else { dbData.Status = "DONE" - err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData) + err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData) if err != nil { log.Printf("Update Instance DB Entry for release %s has error.", releaseName) } @@ -538,7 +543,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques } k8sClient := KubernetesClient{} - err = k8sClient.Init(i.CloudRegion, id) + err = k8sClient.Init(ctx, i.CloudRegion, id) if err != nil { return InstanceResponse{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") } @@ -579,7 +584,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques PostUpgradeTimeout: hookTimeoutInfo.postUpgradeTimeout, } - err = k8sClient.ensureNamespace(profile.Namespace) + err = k8sClient.ensureNamespace(ctx, profile.Namespace) if err != nil { return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace") } @@ -591,7 +596,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques if len(crdList) > 0 { log.Printf("Pre-Installing CRDs") - _, err = k8sClient.createResources(crdList, profile.Namespace) + _, err = k8sClient.createResources(ctx, crdList, profile.Namespace) if err != nil { return InstanceResponse{}, pkgerrors.Wrap(err, "Pre-Installing CRDs") @@ -613,7 +618,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry") } - upgradedResources, err := k8sClient.updateResources(sortedTemplates, profile.Namespace, true) + upgradedResources, err := k8sClient.updateResources(ctx, sortedTemplates, profile.Namespace, true) if err != nil { log.Printf(" Instance: %s, Main rss are failed, skip post-upgrade", id) return InstanceResponse{}, pkgerrors.Wrap(err, "Upgrade Kubernetes Resources") @@ -628,8 +633,8 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques exists = true break } else { - status1, err := k8sClient.GetResourceStatus(res, profile.Namespace) - status2, err2 := k8sClient.GetResourceStatus(pastRes, currentInstance.Namespace) + status1, err := k8sClient.GetResourceStatus(ctx, res, profile.Namespace) + status2, err2 := k8sClient.GetResourceStatus(ctx, pastRes, currentInstance.Namespace) if err == nil && err2 == nil && status1.Value() == status2.Value() { //only when resource is namespace-less exists = true @@ -643,7 +648,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques } } - err = k8sClient.deleteResources(helm.GetReverseK8sResources(resToDelete), currentInstance.Namespace) + err = k8sClient.deleteResources(ctx, helm.GetReverseK8sResources(resToDelete), currentInstance.Namespace) configClient := NewConfigClient() configList, err := configClient.List(id) @@ -814,6 +819,12 @@ func (v *InstanceClient) Query(ctx context.Context, id, apiVersion, kind, name, // Status returns the status for the instance func (v *InstanceClient) Status(ctx context.Context, id string, checkReady bool) (InstanceStatus, error) { + ctx, statusSpan := tracer.Start(ctx, "InstanceClient.Status") + statusSpan.SetAttributes( + attribute.String(string(semconv.CodeFunctionKey), "Status"), + attribute.String(string(semconv.CodeNamespaceKey), "k8splugin/internal/app.InstanceClient"), + ) + defer statusSpan.End() //Read the status from the DB key := InstanceKey{ ID: id, @@ -836,7 +847,7 @@ func (v *InstanceClient) Status(ctx context.Context, id string, checkReady bool) } k8sClient := KubernetesClient{} - err = k8sClient.Init(resResp.Request.CloudRegion, id) + err = k8sClient.Init(ctx, resResp.Request.CloudRegion, id) if err != nil { return InstanceStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") } @@ -847,7 +858,7 @@ func (v *InstanceClient) Status(ctx context.Context, id string, checkReady bool) } cumulatedErrorMsg := make([]string, 0) - podsStatus, err := k8sClient.getPodsByLabel(resResp.Namespace) + podsStatus, err := k8sClient.getPodsByLabel(ctx, resResp.Namespace) if err != nil { cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) } @@ -861,7 +872,7 @@ Main: continue Main //Don't double check pods if someone decided to define pod explicitly in helm chart } } - status, err := k8sClient.GetResourceStatus(oneResource, resResp.Namespace) + status, err := k8sClient.GetResourceStatus(ctx, oneResource, resResp.Namespace) if err != nil { cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) isReady = false @@ -1093,7 +1104,7 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error { } k8sClient := KubernetesClient{} - err = k8sClient.Init(inst.Request.CloudRegion, inst.ID) + err = k8sClient.Init(ctx, inst.Request.CloudRegion, inst.ID) if err != nil { return pkgerrors.Wrap(err, "Getting CloudRegion Information") } @@ -1131,7 +1142,7 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error { return pkgerrors.Wrap(err, "Cleanup Config Resources") } - err = k8sClient.deleteResources(helm.GetReverseK8sResources(inst.Resources), inst.Namespace) + err = k8sClient.deleteResources(ctx, helm.GetReverseK8sResources(inst.Resources), inst.Namespace) if err != nil { return pkgerrors.Wrap(err, "Deleting Instance Resources") } @@ -1206,7 +1217,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e return nil } k8sClient := KubernetesClient{} - err = k8sClient.Init(instance.Request.CloudRegion, id) + err = k8sClient.Init(ctx, instance.Request.CloudRegion, id) if err != nil { log.Printf(" Error getting CloudRegion %s", instance.Request.CloudRegion) return nil @@ -1245,7 +1256,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e return } - err = k8sClient.deleteResources(helm.GetReverseK8sResources(instance.Resources), instance.Namespace) + err = k8sClient.deleteResources(ctx, helm.GetReverseK8sResources(instance.Resources), instance.Namespace) if err != nil { log.Printf(" Error running deleting instance resources, error: %s", err) return @@ -1322,13 +1333,13 @@ func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *H GVK: h.KRT.GVK, Name: h.Hook.Name, } - if _, err := k8sClient.GetResourceStatus(res, hookClient.kubeNameSpace); err == nil { + if _, err := k8sClient.GetResourceStatus(context.TODO(), res, hookClient.kubeNameSpace); err == nil { remainHookRss = append(remainHookRss, res) log.Printf(" Rss %s will be deleted.", res.Name) } } if len(remainHookRss) > 0 { - err = k8sClient.deleteResources(remainHookRss, hookClient.kubeNameSpace) + err = k8sClient.deleteResources(context.TODO(), remainHookRss, hookClient.kubeNameSpace) if err != nil { log.Printf("Error cleaning Hook Rss, please do it manually if needed. Error: %s", err.Error()) } diff --git a/src/k8splugin/internal/app/query.go b/src/k8splugin/internal/app/query.go index 6a1b3516..2c1b5466 100644 --- a/src/k8splugin/internal/app/query.go +++ b/src/k8splugin/internal/app/query.go @@ -19,6 +19,8 @@ package app import ( + "context" + pkgerrors "github.com/pkg/errors" ) @@ -55,14 +57,14 @@ func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labe //Read the status from the DD k8sClient := KubernetesClient{} - err := k8sClient.Init(cloudRegion, "dummy") //we don't care about instance id in this request + err := k8sClient.Init(context.TODO(), cloudRegion, "dummy") //we don't care about instance id in this request if err != nil { return QueryStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information") } var resourcesStatus []ResourceStatus if name != "" { - resList, err := k8sClient.queryResources(apiVersion, kind, labels, namespace) + resList, err := k8sClient.queryResources(context.TODO(), apiVersion, kind, labels, namespace) if err != nil { return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources") } @@ -79,7 +81,7 @@ func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labe resourcesStatus = resList } } else { - resList, err := k8sClient.queryResources(apiVersion, kind, labels, namespace) + resList, err := k8sClient.queryResources(context.TODO(), apiVersion, kind, labels, namespace) if err != nil { return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources") } diff --git a/src/k8splugin/internal/app/subscription.go b/src/k8splugin/internal/app/subscription.go index aa66033f..48ecc00a 100644 --- a/src/k8splugin/internal/app/subscription.go +++ b/src/k8splugin/internal/app/subscription.go @@ -394,7 +394,8 @@ func (iss *InstanceStatusSubClient) refreshWatchers(instanceId, subId string) er }) v := NewInstanceClient() k8sClient := KubernetesClient{} - instance, err := v.Get(context.TODO(), instanceId) + ctx := context.TODO() + instance, err := v.Get(ctx, instanceId) if err != nil { return pkgerrors.Wrap(err, "Cannot get instance for notify thread") } @@ -403,7 +404,7 @@ func (iss *InstanceStatusSubClient) refreshWatchers(instanceId, subId string) er if err != nil { return pkgerrors.Wrap(err, "Unable to find Profile instance status") } - err = k8sClient.Init(instance.Request.CloudRegion, instanceId) + err = k8sClient.Init(ctx, instance.Request.CloudRegion, instanceId) if err != nil { return pkgerrors.Wrap(err, "Cannot set k8s client for instance") } diff --git a/src/k8splugin/internal/healthcheck/healthcheck.go b/src/k8splugin/internal/healthcheck/healthcheck.go index 9bc1a877..49d8f1e0 100644 --- a/src/k8splugin/internal/healthcheck/healthcheck.go +++ b/src/k8splugin/internal/healthcheck/healthcheck.go @@ -109,13 +109,14 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err //Determine Cloud Region and namespace v := app.NewInstanceClient() - instance, err := v.Get(context.TODO(), instanceId) + ctx := context.TODO() + instance, err := v.Get(ctx, instanceId) if err != nil { return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Getting instance") } k8sClient := app.KubernetesClient{} - err = k8sClient.Init(instance.Request.CloudRegion, instanceId) + err = k8sClient.Init(ctx, instance.Request.CloudRegion, instanceId) if err != nil { return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Preparing KubeClient") } @@ -153,7 +154,7 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err for _, h := range hooks { h.Status.StartedAt = time.Now() - kr, err := k8sClient.CreateKind(h.Definition.KRT, instance.Namespace) + kr, err := k8sClient.CreateKind(ctx, h.Definition.KRT, instance.Namespace) if err != nil { // Set status fields h.Status.Status = release.HookPhaseFailed @@ -163,7 +164,7 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err retErr := "Starting hook " + h.Status.Name // Dump to DB - err = db.DBconn.Create(context.TODO(), ihc.storeName, key, ihc.tagInst, ihcs) + err = db.DBconn.Create(ctx, ihc.storeName, key, ihc.tagInst, ihcs) if err != nil { retErr = retErr + " and couldn't save to DB" } @@ -175,7 +176,7 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err h.Status.KR = kr } } - err = db.DBconn.Create(context.TODO(), ihc.storeName, key, ihc.tagInst, ihcs) + err = db.DBconn.Create(ctx, ihc.storeName, key, ihc.tagInst, ihcs) if err != nil { return instanceMiniHCStatusFromStatus(ihcs), pkgerrors.Wrap(err, "Creating Instance DB Entry") @@ -222,7 +223,7 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err "Reason": map[bool]string{true: "Hook finished", false: "All hooks finished"}[b], }) if b { //Some hook finished - need to update DB - err = db.DBconn.Update(context.TODO(), ihc.storeName, key, ihc.tagInst, ihcs) + err = db.DBconn.Update(ctx, ihc.storeName, key, ihc.tagInst, ihcs) if err != nil { log.Error("Couldn't update database", log.Fields{ "Store": ihc.storeName, @@ -249,7 +250,7 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err } } ihcs.Status = finalResult - err = db.DBconn.Update(context.TODO(), ihc.storeName, key, ihc.tagInst, ihcs) + err = db.DBconn.Update(ctx, ihc.storeName, key, ihc.tagInst, ihcs) if err != nil { log.Error("Couldn't update database", log.Fields{ "Store": ihc.storeName, @@ -295,7 +296,7 @@ func (ihc InstanceHCClient) Delete(instanceId, healthcheckId string) error { return pkgerrors.Wrap(err, "Error querying Healthcheck status") } k8sClient := app.KubernetesClient{} - err = k8sClient.Init(instance.Request.CloudRegion, instanceId) + err = k8sClient.Init(context.TODO(), instance.Request.CloudRegion, instanceId) if err != nil { return pkgerrors.Wrap(err, "Preparing KubeClient") } diff --git a/src/k8splugin/internal/healthcheck/hooks.go b/src/k8splugin/internal/healthcheck/hooks.go index 0b1be9ac..f62043bf 100644 --- a/src/k8splugin/internal/healthcheck/hooks.go +++ b/src/k8splugin/internal/healthcheck/hooks.go @@ -14,6 +14,7 @@ limitations under the License. package healthcheck import ( + "context" "time" "github.com/onap/multicloud-k8s/src/k8splugin/internal/app" @@ -67,7 +68,7 @@ func getHookState(hookStatus HookStatus, k8sClient app.KubernetesClient, namespa } for { - res, err := k8sClient.GetResourceStatus(hookStatus.KR, namespace) + res, err := k8sClient.GetResourceStatus(context.TODO(), hookStatus.KR, namespace) if err != nil { log.Error("Unable to check Resource Status", log.Fields{ "Resource": hookStatus.KR, -- 2.16.6