Add spans for KubernetesClient methods 23/142723/1
authorFiete Ostkamp <fiete.ostkamp@telekom.de>
Wed, 10 Dec 2025 14:46:10 +0000 (15:46 +0100)
committerFiete Ostkamp <fiete.ostkamp@telekom.de>
Wed, 10 Dec 2025 15:07:41 +0000 (16:07 +0100)
- this is done to better understand where time is spend
  when executing requests

Issue-ID: MULTICLOUD-1538
Change-Id: I7f069b3a188deb685ec595273888f5ad2f6e1054
Signed-off-by: Fiete Ostkamp <fiete.ostkamp@telekom.de>
src/k8splugin/internal/app/client.go
src/k8splugin/internal/app/client_test.go
src/k8splugin/internal/app/config_backend.go
src/k8splugin/internal/app/hook.go
src/k8splugin/internal/app/hook_test.go
src/k8splugin/internal/app/instance.go
src/k8splugin/internal/app/query.go
src/k8splugin/internal/app/subscription.go
src/k8splugin/internal/healthcheck/healthcheck.go
src/k8splugin/internal/healthcheck/hooks.go

index b85fe56..b8f5691 100644 (file)
@@ -242,12 +242,12 @@ func (k *KubernetesClient) WatchHookUntilReady(timeout time.Duration, ns string,
 }
 
 // getPodsByLabel yields status of all pods under given instance ID
-func (k *KubernetesClient) getPodsByLabel(namespace string) ([]ResourceStatus, error) {
+func (k *KubernetesClient) getPodsByLabel(ctx context.Context, namespace string) ([]ResourceStatus, error) {
        client := k.GetStandardClient().CoreV1().Pods(namespace)
        listOpts := metav1.ListOptions{
                LabelSelector: config.GetConfiguration().KubernetesLabelName + "=" + k.instanceID,
        }
-       podList, err := client.List(context.TODO(), listOpts)
+       podList, err := client.List(ctx, listOpts)
        if err != nil {
                return nil, pkgerrors.Wrap(err, "Retrieving PodList from cluster")
        }
@@ -275,7 +275,10 @@ func (k *KubernetesClient) getPodsByLabel(namespace string) ([]ResourceStatus, e
        return resp, nil
 }
 
-func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, namespace string) ([]ResourceStatus, error) {
+func (k *KubernetesClient) queryResources(ctx context.Context, apiVersion, kind, labelSelector, namespace string) ([]ResourceStatus, error) {
+       ctx, span := tracer.Start(ctx, "KubernetesClient.queryResources")
+       defer span.End()
+
        dynClient := k.GetDynamicClient()
        mapper := k.GetMapper()
        gvk := schema.FromAPIVersionAndKind(apiVersion, kind)
@@ -289,12 +292,12 @@ func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, names
                LabelSelector: labelSelector,
        }
        var unstrList *unstructured.UnstructuredList
-       dynClient.Resource(gvr).Namespace(namespace).List(context.TODO(), opts)
+       dynClient.Resource(gvr).Namespace(namespace).List(ctx, opts)
        switch mapping.Scope.Name() {
        case meta.RESTScopeNameNamespace:
-               unstrList, err = dynClient.Resource(gvr).Namespace(namespace).List(context.TODO(), opts)
+               unstrList, err = dynClient.Resource(gvr).Namespace(namespace).List(ctx, opts)
        case meta.RESTScopeNameRoot:
-               unstrList, err = dynClient.Resource(gvr).List(context.TODO(), opts)
+               unstrList, err = dynClient.Resource(gvr).List(ctx, opts)
        default:
                return nil, pkgerrors.New("Got an unknown RESTScopeName for mapping: " + gvk.String())
        }
@@ -312,7 +315,9 @@ func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, names
 }
 
 // GetResourcesStatus yields status of given generic resource
-func (k *KubernetesClient) GetResourceStatus(res helm.KubernetesResource, namespace string) (ResourceStatus, error) {
+func (k *KubernetesClient) GetResourceStatus(ctx context.Context, res helm.KubernetesResource, namespace string) (ResourceStatus, error) {
+       ctx, span := tracer.Start(ctx, "KubernetesClient.GetResourceStatus")
+       defer span.End()
        dynClient := k.GetDynamicClient()
        mapper := k.GetMapper()
        mapping, err := mapper.RESTMapping(schema.GroupKind{
@@ -329,9 +334,9 @@ func (k *KubernetesClient) GetResourceStatus(res helm.KubernetesResource, namesp
        var unstruct *unstructured.Unstructured
        switch mapping.Scope.Name() {
        case meta.RESTScopeNameNamespace:
-               unstruct, err = dynClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), res.Name, opts)
+               unstruct, err = dynClient.Resource(gvr).Namespace(namespace).Get(ctx, res.Name, opts)
        case meta.RESTScopeNameRoot:
-               unstruct, err = dynClient.Resource(gvr).Get(context.TODO(), res.Name, opts)
+               unstruct, err = dynClient.Resource(gvr).Get(ctx, res.Name, opts)
        default:
                return ResourceStatus{}, pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + res.GVK.String())
        }
@@ -345,10 +350,12 @@ func (k *KubernetesClient) GetResourceStatus(res helm.KubernetesResource, namesp
 
 // getKubeConfig uses the connectivity client to get the kubeconfig based on the name
 // of the cloudregion. This is written out to a file.
-func (k *KubernetesClient) getKubeConfig(cloudregion string) (string, error) {
+func (k *KubernetesClient) getKubeConfig(ctx context.Context, cloudregion string) (string, error) {
+       ctx, span := tracer.Start(ctx, "KubernetesClient.getKubeConfig")
+       defer span.End()
 
        conn := connection.NewConnectionClient()
-       kubeConfigPath, err := conn.Download(context.TODO(), cloudregion)
+       kubeConfigPath, err := conn.Download(ctx, cloudregion)
        if err != nil {
                return "", pkgerrors.Wrap(err, "Downloading kubeconfig")
        }
@@ -356,8 +363,11 @@ func (k *KubernetesClient) getKubeConfig(cloudregion string) (string, error) {
        return kubeConfigPath, nil
 }
 
-// Init loads the Kubernetes configuation values stored into the local configuration file
-func (k *KubernetesClient) Init(cloudregion string, iid string) error {
+// Init loads the Kubernetes configuration values stored into the local configuration file
+func (k *KubernetesClient) Init(ctx context.Context, cloudregion string, iid string) error {
+       ctx, span := tracer.Start(ctx, "KubernetesClient.Init")
+       defer span.End()
+
        if cloudregion == "" {
                return pkgerrors.New("Cloudregion is empty")
        }
@@ -368,7 +378,7 @@ func (k *KubernetesClient) Init(cloudregion string, iid string) error {
 
        k.instanceID = iid
 
-       configPath, err := k.getKubeConfig(cloudregion)
+       configPath, err := k.getKubeConfig(ctx, cloudregion)
        if err != nil {
                return pkgerrors.Wrap(err, "Get kubeconfig file")
        }
@@ -416,7 +426,9 @@ func (k *KubernetesClient) Init(cloudregion string, iid string) error {
        return nil
 }
 
-func (k *KubernetesClient) ensureNamespace(namespace string) error {
+func (k *KubernetesClient) ensureNamespace(ctx context.Context, namespace string) error {
+       _, span := tracer.Start(ctx, "KubernetesClient.ensureNamespace")
+       defer span.End()
 
        pluginImpl, err := plugin.GetPluginByKind("Namespace")
        if err != nil {
@@ -459,7 +471,9 @@ func (k *KubernetesClient) ensureNamespace(namespace string) error {
        return nil
 }
 
-func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate, namespace string) (helm.KubernetesResource, error) {
+func (k *KubernetesClient) CreateKind(ctx context.Context, resTempl helm.KubernetesResourceTemplate, namespace string) (helm.KubernetesResource, error) {
+       _, span := tracer.Start(ctx, "KubernetesClient.CreateKind")
+       defer span.End()
 
        if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
                return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists")
@@ -495,8 +509,10 @@ func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate,
        }, nil
 }
 
-func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate,
+func (k *KubernetesClient) updateKind(ctx context.Context, resTempl helm.KubernetesResourceTemplate,
        namespace string, createIfDoNotExist bool) (helm.KubernetesResource, error) {
+       _, span := tracer.Start(ctx, "KubernetesClient.updateKind")
+       defer span.End()
 
        if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
                return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + " does not exists")
@@ -541,18 +557,20 @@ func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate,
        }, nil
 }
 
-func (k *KubernetesClient) createResources(sortedTemplates []helm.KubernetesResourceTemplate,
+func (k *KubernetesClient) createResources(ctx context.Context, sortedTemplates []helm.KubernetesResourceTemplate,
        namespace string) ([]helm.KubernetesResource, error) {
+       ctx, span := tracer.Start(ctx, "KubernetesClient.createResources")
+       defer span.End()
 
        var createdResources []helm.KubernetesResource
 
-       err := k.ensureNamespace(namespace)
+       err := k.ensureNamespace(ctx, namespace)
        if err != nil {
                return createdResources, pkgerrors.Wrap(err, "Creating Namespace")
        }
 
        for _, resTempl := range sortedTemplates {
-               resCreated, err := k.CreateKind(resTempl, namespace)
+               resCreated, err := k.CreateKind(ctx, resTempl, namespace)
                if err != nil {
                        return createdResources, pkgerrors.Wrapf(err, "Error creating kind: %+v", resTempl.GVK)
                }
@@ -562,17 +580,19 @@ func (k *KubernetesClient) createResources(sortedTemplates []helm.KubernetesReso
        return createdResources, nil
 }
 
-func (k *KubernetesClient) updateResources(sortedTemplates []helm.KubernetesResourceTemplate,
+func (k *KubernetesClient) updateResources(ctx context.Context, sortedTemplates []helm.KubernetesResourceTemplate,
        namespace string, createIfDoNotExist bool) ([]helm.KubernetesResource, error) {
+       ctx, span := tracer.Start(ctx, "KubernetesClient.updateResources")
+       defer span.End()
 
-       err := k.ensureNamespace(namespace)
+       err := k.ensureNamespace(ctx, namespace)
        if err != nil {
                return nil, pkgerrors.Wrap(err, "Creating Namespace")
        }
 
        var updatedResources []helm.KubernetesResource
        for _, resTempl := range sortedTemplates {
-               resUpdated, err := k.updateKind(resTempl, namespace, createIfDoNotExist)
+               resUpdated, err := k.updateKind(ctx, resTempl, namespace, createIfDoNotExist)
                if err != nil {
                        return nil, pkgerrors.Wrapf(err, "Error updating kind: %+v", resTempl.GVK)
                }
@@ -609,7 +629,9 @@ func (k *KubernetesClient) DeleteKind(resource helm.KubernetesResource, namespac
        return nil
 }
 
-func (k *KubernetesClient) deleteResources(resources []helm.KubernetesResource, namespace string) error {
+func (k *KubernetesClient) deleteResources(ctx context.Context, resources []helm.KubernetesResource, namespace string) error {
+       _, deleteSpan := tracer.Start(ctx, "KubernetesClient.deleteResources")
+       defer deleteSpan.End()
        //TODO: Investigate if deletion should be in a particular order
        for _, res := range resources {
                err := k.DeleteKind(res, namespace)
index f51c15f..5d5a2ab 100644 (file)
@@ -14,6 +14,7 @@ limitations under the License.
 package app
 
 import (
+       "context"
        "encoding/base64"
        "io/ioutil"
        "os"
@@ -73,7 +74,7 @@ func TestInit(t *testing.T) {
 
                kubeClient := KubernetesClient{}
                // Refer to the connection via its name
-               err = kubeClient.Init("mock_connection", "abcdefg")
+               err = kubeClient.Init(context.TODO(), "mock_connection", "abcdefg")
                if err != nil {
                        t.Fatalf("TestGetKubeClient returned an error (%s)", err)
                }
@@ -120,7 +121,7 @@ func TestCreateResources(t *testing.T) {
                        },
                }
 
-               _, err := k8.createResources(data, "testnamespace")
+               _, err := k8.createResources(context.TODO(), data, "testnamespace")
                if err != nil {
                        t.Fatalf("TestCreateResources returned an error (%s)", err)
                }
@@ -175,7 +176,7 @@ func TestDeleteResources(t *testing.T) {
                        },
                }
 
-               err := k8.deleteResources(data, "test")
+               err := k8.deleteResources(context.TODO(), data, "test")
                if err != nil {
                        t.Fatalf("TestCreateVNF returned an error (%s)", err)
                }
index 046ef81..454684b 100644 (file)
@@ -530,10 +530,11 @@ func scheduleResources(c chan configResourceList) {
        // Keep thread running
        log.Printf("[scheduleResources]: START thread")
        for {
+               ctx := context.TODO()
                data := <-c
                //TODO: ADD Check to see if Application running
                ic := NewInstanceClient()
-               resp, err := ic.Find(context.TODO(), data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil)
+               resp, err := ic.Find(ctx, data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil)
                if (err != nil || len(resp) == 0) && data.action != "STOP" {
                        log.Println("Error finding a running instance. Retrying later...")
                        data.updatedResources <- []KubernetesConfigResource{}
@@ -546,7 +547,7 @@ func scheduleResources(c chan configResourceList) {
                        resources := []KubernetesConfigResource{}
                        for _, inst := range resp {
                                k8sClient := KubernetesClient{}
-                               err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
+                               err = k8sClient.Init(ctx, inst.Request.CloudRegion, inst.ID)
                                if err != nil {
                                        log.Printf("Getting CloudRegion Information: %s", err.Error())
                                        //Move onto the next cloud region
@@ -554,12 +555,12 @@ func scheduleResources(c chan configResourceList) {
                                }
                                for _, res := range data.resourceTemplates {
                                        var resToCreateOrUpdate = []helm.KubernetesResourceTemplate{res}
-                                       resProceeded, err := k8sClient.createResources(resToCreateOrUpdate, inst.Namespace)
+                                       resProceeded, err := k8sClient.createResources(ctx, resToCreateOrUpdate, inst.Namespace)
                                        errCreate := err
                                        var status string = ""
                                        if err != nil {
                                                // assuming - the err represent the resource already exist, so going for update
-                                               resProceeded, err = k8sClient.updateResources(resToCreateOrUpdate, inst.Namespace, false)
+                                               resProceeded, err = k8sClient.updateResources(ctx, resToCreateOrUpdate, inst.Namespace, false)
                                                if err != nil {
                                                        log.Printf("Error Creating resources: %s", errCreate.Error())
                                                        log.Printf("Error Updating resources: %s", err.Error())
@@ -583,7 +584,7 @@ func scheduleResources(c chan configResourceList) {
                        log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resources)
                        for _, inst := range resp {
                                k8sClient := KubernetesClient{}
-                               err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
+                               err = k8sClient.Init(ctx, inst.Request.CloudRegion, inst.ID)
                                if err != nil {
                                        log.Printf("Getting CloudRegion Information: %s", err.Error())
                                        //Move onto the next cloud region
@@ -593,7 +594,7 @@ func scheduleResources(c chan configResourceList) {
                                for _, res := range data.resources {
                                        tmpResources = append(tmpResources, res.Resource)
                                }
-                               err = k8sClient.deleteResources(helm.GetReverseK8sResources(tmpResources), inst.Namespace)
+                               err = k8sClient.deleteResources(context.TODO(), helm.GetReverseK8sResources(tmpResources), inst.Namespace)
                                if err != nil {
                                        log.Printf("Error Deleting resources: %s", err.Error())
                                        continue
index 9645adc..e4a4e9d 100644 (file)
@@ -77,6 +77,7 @@ func (hc *HookClient) ExecHook(
        timeout int64,
        startIndex int,
        dbData *InstanceDbData) error {
+       ctx := context.TODO()
        executingHooks := hc.getHookByEvent(hs, hook)
        key := InstanceKey{
                ID: hc.id,
@@ -109,7 +110,7 @@ func (hc *HookClient) ExecHook(
                        GVK:      h.KRT.GVK,
                        FilePath: h.KRT.FilePath,
                }
-               createdHook, err := k8sClient.CreateKind(resTempl, hc.kubeNameSpace)
+               createdHook, err := k8sClient.CreateKind(ctx, resTempl, hc.kubeNameSpace)
                if err != nil {
                        log.Printf("  Instance: %s, Warning: %s hook %s, filePath: %s, error: %s", hc.id, hook, h.Hook.Name, h.KRT.FilePath, err)
                        hc.deleteHookByPolicy(h, release.HookFailed, k8sClient)
@@ -151,7 +152,7 @@ func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDelete
        }
        if hookHasDeletePolicy(h, policy) {
                log.Printf("  Instance: %s, Deleting hook %s due to %q policy", hc.id, h.Hook.Name, policy)
-               if errHookDelete := k8sClient.deleteResources(append([]helm.KubernetesResource{}, rss), hc.kubeNameSpace); errHookDelete != nil {
+               if errHookDelete := k8sClient.deleteResources(context.TODO(), append([]helm.KubernetesResource{}, rss), hc.kubeNameSpace); errHookDelete != nil {
                        if strings.Contains(errHookDelete.Error(), "not found") {
                                return nil
                        } else {
@@ -163,7 +164,7 @@ func (hc *HookClient) deleteHookByPolicy(h *helm.Hook, policy release.HookDelete
                        isDeleted := false
                        for !isDeleted {
                                log.Printf("  Instance: %s, Waiting on deleting hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy)
-                               if _, err := k8sClient.GetResourceStatus(rss, hc.kubeNameSpace); err != nil {
+                               if _, err := k8sClient.GetResourceStatus(context.TODO(), rss, hc.kubeNameSpace); err != nil {
                                        if strings.Contains(err.Error(), "not found") {
                                                log.Printf("  Instance: %s, Deleted hook %s for release %s due to %q policy", hc.id, h.Hook.Name, hc.id, policy)
                                                return nil
index 9c63194..79b6bcc 100644 (file)
@@ -14,36 +14,38 @@ limitations under the License.
 package app
 
 import (
+       "context"
        "encoding/base64"
-       "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
+       "io/ioutil"
+       "testing"
+
        "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection"
-       "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
        "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
+       "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
+       "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils"
        "helm.sh/helm/v3/pkg/release"
        "helm.sh/helm/v3/pkg/time"
-       "io/ioutil"
        "k8s.io/apimachinery/pkg/runtime/schema"
-       "testing"
 )
 
 func generateHookList() []*helm.Hook {
        var hookList []*helm.Hook
        preInstallHook1 := helm.Hook{
                Hook: release.Hook{
-                       Name : "preinstall1",
-                       Kind : "Job",
-                       Path : "",
-                       Manifest : "",
-                       Events : []release.HookEvent{release.HookPreInstall},
-                       LastRun : release.HookExecution{
+                       Name:     "preinstall1",
+                       Kind:     "Job",
+                       Path:     "",
+                       Manifest: "",
+                       Events:   []release.HookEvent{release.HookPreInstall},
+                       LastRun: release.HookExecution{
                                StartedAt:   time.Now(),
                                CompletedAt: time.Now(),
                                Phase:       "",
                        },
-                       Weight : -5,
-                       DeletePolicies : []release.HookDeletePolicy{},
+                       Weight:         -5,
+                       DeletePolicies: []release.HookDeletePolicy{},
                },
-               KRT:  helm.KubernetesResourceTemplate{
+               KRT: helm.KubernetesResourceTemplate{
                        GVK: schema.GroupVersionKind{
                                Group:   "batch",
                                Version: "v1",
@@ -54,20 +56,20 @@ func generateHookList() []*helm.Hook {
        }
        preInstallHook2 := helm.Hook{
                Hook: release.Hook{
-                       Name : "preinstall2",
-                       Kind : "Deployment",
-                       Path : "",
-                       Manifest : "",
-                       Events : []release.HookEvent{release.HookPreInstall},
-                       LastRun : release.HookExecution{
+                       Name:     "preinstall2",
+                       Kind:     "Deployment",
+                       Path:     "",
+                       Manifest: "",
+                       Events:   []release.HookEvent{release.HookPreInstall},
+                       LastRun: release.HookExecution{
                                StartedAt:   time.Now(),
                                CompletedAt: time.Now(),
                                Phase:       "",
                        },
-                       Weight : 0,
-                       DeletePolicies : []release.HookDeletePolicy{},
+                       Weight:         0,
+                       DeletePolicies: []release.HookDeletePolicy{},
                },
-               KRT:  helm.KubernetesResourceTemplate{
+               KRT: helm.KubernetesResourceTemplate{
                        GVK: schema.GroupVersionKind{
                                Group:   "batch",
                                Version: "v1",
@@ -78,20 +80,20 @@ func generateHookList() []*helm.Hook {
        }
        postInstallHook := helm.Hook{
                Hook: release.Hook{
-                       Name : "postinstall",
-                       Kind : "Job",
-                       Path : "",
-                       Manifest : "",
-                       Events : []release.HookEvent{release.HookPostInstall},
-                       LastRun : release.HookExecution{
+                       Name:     "postinstall",
+                       Kind:     "Job",
+                       Path:     "",
+                       Manifest: "",
+                       Events:   []release.HookEvent{release.HookPostInstall},
+                       LastRun: release.HookExecution{
                                StartedAt:   time.Now(),
                                CompletedAt: time.Now(),
                                Phase:       "",
                        },
-                       Weight : -5,
-                       DeletePolicies : []release.HookDeletePolicy{},
+                       Weight:         -5,
+                       DeletePolicies: []release.HookDeletePolicy{},
                },
-               KRT:  helm.KubernetesResourceTemplate{
+               KRT: helm.KubernetesResourceTemplate{
                        GVK: schema.GroupVersionKind{
                                Group:   "batch",
                                Version: "v1",
@@ -102,20 +104,20 @@ func generateHookList() []*helm.Hook {
        }
        preDeleteHook := helm.Hook{
                Hook: release.Hook{
-                       Name : "predelete",
-                       Kind : "Job",
-                       Path : "",
-                       Manifest : "",
-                       Events : []release.HookEvent{release.HookPreDelete},
-                       LastRun : release.HookExecution{
+                       Name:     "predelete",
+                       Kind:     "Job",
+                       Path:     "",
+                       Manifest: "",
+                       Events:   []release.HookEvent{release.HookPreDelete},
+                       LastRun: release.HookExecution{
                                StartedAt:   time.Now(),
                                CompletedAt: time.Now(),
                                Phase:       "",
                        },
-                       Weight : -5,
-                       DeletePolicies : []release.HookDeletePolicy{},
+                       Weight:         -5,
+                       DeletePolicies: []release.HookDeletePolicy{},
                },
-               KRT:  helm.KubernetesResourceTemplate{
+               KRT: helm.KubernetesResourceTemplate{
                        GVK: schema.GroupVersionKind{
                                Group:   "batch",
                                Version: "v1",
@@ -126,20 +128,20 @@ func generateHookList() []*helm.Hook {
        }
        postDeleteHook := helm.Hook{
                Hook: release.Hook{
-                       Name : "postdelete",
-                       Kind : "Job",
-                       Path : "",
-                       Manifest : "",
-                       Events : []release.HookEvent{release.HookPostDelete},
-                       LastRun : release.HookExecution{
+                       Name:     "postdelete",
+                       Kind:     "Job",
+                       Path:     "",
+                       Manifest: "",
+                       Events:   []release.HookEvent{release.HookPostDelete},
+                       LastRun: release.HookExecution{
                                StartedAt:   time.Now(),
                                CompletedAt: time.Now(),
                                Phase:       "",
                        },
-                       Weight : -5,
-                       DeletePolicies : []release.HookDeletePolicy{},
+                       Weight:         -5,
+                       DeletePolicies: []release.HookDeletePolicy{},
                },
-               KRT:  helm.KubernetesResourceTemplate{
+               KRT: helm.KubernetesResourceTemplate{
                        GVK: schema.GroupVersionKind{
                                Group:   "batch",
                                Version: "v1",
@@ -241,24 +243,24 @@ func TestExecHook(t *testing.T) {
        }
 
        k8sClient := KubernetesClient{}
-       err = k8sClient.Init("mock_connection", "test")
+       err = k8sClient.Init(context.TODO(), "mock_connection", "test")
        if err != nil {
                t.Fatal(err.Error())
        }
-       err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall,10,0, nil)
+       err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, 10, 0, nil)
        if err != nil {
                t.Fatal(err.Error())
        }
-       err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall,10,0, nil)
+       err = hookClient.ExecHook(k8sClient, hookList, release.HookPostInstall, 10, 0, nil)
        if err != nil {
                t.Fatal(err.Error())
        }
-       err = hookClient.ExecHook(k8sClient, hookList, release.HookPreDelete,10,0, nil)
+       err = hookClient.ExecHook(k8sClient, hookList, release.HookPreDelete, 10, 0, nil)
        if err != nil {
                t.Fatal(err.Error())
        }
-       err = hookClient.ExecHook(k8sClient, hookList, release.HookPostDelete,10,0, nil)
+       err = hookClient.ExecHook(k8sClient, hookList, release.HookPostDelete, 10, 0, nil)
        if err != nil {
                t.Fatal(err.Error())
        }
-}
\ No newline at end of file
+}
index 94eb216..c6a4c59 100644 (file)
@@ -28,6 +28,9 @@ import (
        "strings"
        "time"
 
+       "go.opentelemetry.io/otel"
+       "go.opentelemetry.io/otel/attribute"
+       semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
        appsv1 "k8s.io/api/apps/v1"
        batchv1 "k8s.io/api/batch/v1"
        corev1 "k8s.io/api/core/v1"
@@ -134,6 +137,8 @@ type InstanceKey struct {
        ID string `json:"id"`
 }
 
+var tracer = otel.Tracer("internal/app")
+
 // We will use json marshalling to convert to string to
 // preserve the underlying structure.
 func (dk InstanceKey) String() string {
@@ -284,7 +289,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
        }
 
        k8sClient := KubernetesClient{}
-       err = k8sClient.Init(i.CloudRegion, finalId)
+       err = k8sClient.Init(ctx, i.CloudRegion, finalId)
        if err != nil {
                namegenerator.Release(generatedId)
                return InstanceResponse{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
@@ -326,7 +331,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
                PostUpgradeTimeout: hookTimeoutInfo.postUpgradeTimeout,
        }
 
-       err = k8sClient.ensureNamespace(profile.Namespace)
+       err = k8sClient.ensureNamespace(ctx, profile.Namespace)
        if err != nil {
                namegenerator.Release(generatedId)
                return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace")
@@ -335,7 +340,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
        key := InstanceKey{
                ID: finalId,
        }
-       err = db.DBconn.Create(context.TODO(), v.storeName, key, v.tagInst, dbData)
+       err = db.DBconn.Create(ctx, v.storeName, key, v.tagInst, dbData)
        if err != nil {
                namegenerator.Release(generatedId)
                return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry")
@@ -343,7 +348,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
 
        if len(crdList) > 0 {
                log.Printf("Pre-Installing CRDs")
-               _, err = k8sClient.createResources(crdList, profile.Namespace)
+               _, err = k8sClient.createResources(ctx, crdList, profile.Namespace)
 
                if err != nil {
                        return InstanceResponse{}, pkgerrors.Wrap(err, "Pre-Installing CRDs")
@@ -355,7 +360,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
                err = hookClient.ExecHook(k8sClient, hookList, release.HookPreInstall, hookTimeoutInfo.preInstallTimeOut, 0, &dbData)
                if err != nil {
                        log.Printf("Error running preinstall hooks for release %s, Error: %s. Stop here", releaseName, err)
-                       err2 := db.DBconn.Delete(context.TODO(), v.storeName, key, v.tagInst)
+                       err2 := db.DBconn.Delete(ctx, v.storeName, key, v.tagInst)
                        if err2 != nil {
                                log.Printf("Error cleaning failed instance in DB, please check DB.")
                        } else {
@@ -366,9 +371,9 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
        }
 
        dbData.Status = "CREATING"
-       err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData)
+       err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData)
        if err != nil {
-               err2 := db.DBconn.Delete(context.TODO(), v.storeName, key, v.tagInst)
+               err2 := db.DBconn.Delete(ctx, v.storeName, key, v.tagInst)
                if err2 != nil {
                        log.Printf("Delete Instance DB Entry for release %s has error.", releaseName)
                } else {
@@ -378,15 +383,15 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
        }
 
        //Main rss creation is supposed to be very quick -> no need to support recover for main rss
-       createdResources, err := k8sClient.createResources(sortedTemplates, profile.Namespace)
+       createdResources, err := k8sClient.createResources(ctx, sortedTemplates, profile.Namespace)
        if err != nil {
                if len(createdResources) > 0 {
                        log.Printf("[Instance] Reverting created resources on Error: %s", err.Error())
-                       k8sClient.deleteResources(helm.GetReverseK8sResources(createdResources), profile.Namespace)
+                       k8sClient.deleteResources(ctx, helm.GetReverseK8sResources(createdResources), profile.Namespace)
                }
                log.Printf("  Instance: %s, Main rss are failed, skip post-install and remove instance in DB", finalId)
                //main rss creation failed -> remove instance in DB
-               err2 := db.DBconn.Delete(context.TODO(), v.storeName, key, v.tagInst)
+               err2 := db.DBconn.Delete(ctx, v.storeName, key, v.tagInst)
                if err2 != nil {
                        log.Printf("Delete Instance DB Entry for release %s has error.", releaseName)
                } else {
@@ -397,7 +402,7 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
 
        dbData.Status = "CREATED"
        dbData.Resources = createdResources
-       err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData)
+       err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData)
        if err != nil {
                return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry")
        }
@@ -423,14 +428,14 @@ func (v *InstanceClient) Create(ctx context.Context, i InstanceRequest, newId st
                        } else {
                                dbData.Status = "DONE"
                        }
-                       err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData)
+                       err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData)
                        if err != nil {
                                log.Printf("Update Instance DB Entry for release %s has error.", releaseName)
                        }
                }()
        } else {
                dbData.Status = "DONE"
-               err = db.DBconn.Update(context.TODO(), v.storeName, key, v.tagInst, dbData)
+               err = db.DBconn.Update(ctx, v.storeName, key, v.tagInst, dbData)
                if err != nil {
                        log.Printf("Update Instance DB Entry for release %s has error.", releaseName)
                }
@@ -538,7 +543,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
        }
 
        k8sClient := KubernetesClient{}
-       err = k8sClient.Init(i.CloudRegion, id)
+       err = k8sClient.Init(ctx, i.CloudRegion, id)
        if err != nil {
                return InstanceResponse{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
        }
@@ -579,7 +584,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
                PostUpgradeTimeout: hookTimeoutInfo.postUpgradeTimeout,
        }
 
-       err = k8sClient.ensureNamespace(profile.Namespace)
+       err = k8sClient.ensureNamespace(ctx, profile.Namespace)
        if err != nil {
                return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Namespace")
        }
@@ -591,7 +596,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
 
        if len(crdList) > 0 {
                log.Printf("Pre-Installing CRDs")
-               _, err = k8sClient.createResources(crdList, profile.Namespace)
+               _, err = k8sClient.createResources(ctx, crdList, profile.Namespace)
 
                if err != nil {
                        return InstanceResponse{}, pkgerrors.Wrap(err, "Pre-Installing CRDs")
@@ -613,7 +618,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
                return InstanceResponse{}, pkgerrors.Wrap(err, "Update Instance DB Entry")
        }
 
-       upgradedResources, err := k8sClient.updateResources(sortedTemplates, profile.Namespace, true)
+       upgradedResources, err := k8sClient.updateResources(ctx, sortedTemplates, profile.Namespace, true)
        if err != nil {
                log.Printf("  Instance: %s, Main rss are failed, skip post-upgrade", id)
                return InstanceResponse{}, pkgerrors.Wrap(err, "Upgrade Kubernetes Resources")
@@ -628,8 +633,8 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
                                        exists = true
                                        break
                                } else {
-                                       status1, err := k8sClient.GetResourceStatus(res, profile.Namespace)
-                                       status2, err2 := k8sClient.GetResourceStatus(pastRes, currentInstance.Namespace)
+                                       status1, err := k8sClient.GetResourceStatus(ctx, res, profile.Namespace)
+                                       status2, err2 := k8sClient.GetResourceStatus(ctx, pastRes, currentInstance.Namespace)
                                        if err == nil && err2 == nil && status1.Value() == status2.Value() {
                                                //only when resource is namespace-less
                                                exists = true
@@ -643,7 +648,7 @@ func (v *InstanceClient) Upgrade(ctx context.Context, id string, u UpgradeReques
                }
        }
 
-       err = k8sClient.deleteResources(helm.GetReverseK8sResources(resToDelete), currentInstance.Namespace)
+       err = k8sClient.deleteResources(ctx, helm.GetReverseK8sResources(resToDelete), currentInstance.Namespace)
 
        configClient := NewConfigClient()
        configList, err := configClient.List(id)
@@ -814,6 +819,12 @@ func (v *InstanceClient) Query(ctx context.Context, id, apiVersion, kind, name,
 
 // Status returns the status for the instance
 func (v *InstanceClient) Status(ctx context.Context, id string, checkReady bool) (InstanceStatus, error) {
+       ctx, statusSpan := tracer.Start(ctx, "InstanceClient.Status")
+       statusSpan.SetAttributes(
+               attribute.String(string(semconv.CodeFunctionKey), "Status"),
+               attribute.String(string(semconv.CodeNamespaceKey), "k8splugin/internal/app.InstanceClient"),
+       )
+       defer statusSpan.End()
        //Read the status from the DB
        key := InstanceKey{
                ID: id,
@@ -836,7 +847,7 @@ func (v *InstanceClient) Status(ctx context.Context, id string, checkReady bool)
        }
 
        k8sClient := KubernetesClient{}
-       err = k8sClient.Init(resResp.Request.CloudRegion, id)
+       err = k8sClient.Init(ctx, resResp.Request.CloudRegion, id)
        if err != nil {
                return InstanceStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
        }
@@ -847,7 +858,7 @@ func (v *InstanceClient) Status(ctx context.Context, id string, checkReady bool)
        }
 
        cumulatedErrorMsg := make([]string, 0)
-       podsStatus, err := k8sClient.getPodsByLabel(resResp.Namespace)
+       podsStatus, err := k8sClient.getPodsByLabel(ctx, resResp.Namespace)
        if err != nil {
                cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
        }
@@ -861,7 +872,7 @@ Main:
                                continue Main //Don't double check pods if someone decided to define pod explicitly in helm chart
                        }
                }
-               status, err := k8sClient.GetResourceStatus(oneResource, resResp.Namespace)
+               status, err := k8sClient.GetResourceStatus(ctx, oneResource, resResp.Namespace)
                if err != nil {
                        cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
                        isReady = false
@@ -1093,7 +1104,7 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error {
        }
 
        k8sClient := KubernetesClient{}
-       err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
+       err = k8sClient.Init(ctx, inst.Request.CloudRegion, inst.ID)
        if err != nil {
                return pkgerrors.Wrap(err, "Getting CloudRegion Information")
        }
@@ -1131,7 +1142,7 @@ func (v *InstanceClient) Delete(ctx context.Context, id string) error {
                return pkgerrors.Wrap(err, "Cleanup Config Resources")
        }
 
-       err = k8sClient.deleteResources(helm.GetReverseK8sResources(inst.Resources), inst.Namespace)
+       err = k8sClient.deleteResources(ctx, helm.GetReverseK8sResources(inst.Resources), inst.Namespace)
        if err != nil {
                return pkgerrors.Wrap(err, "Deleting Instance Resources")
        }
@@ -1206,7 +1217,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e
                return nil
        }
        k8sClient := KubernetesClient{}
-       err = k8sClient.Init(instance.Request.CloudRegion, id)
+       err = k8sClient.Init(ctx, instance.Request.CloudRegion, id)
        if err != nil {
                log.Printf("  Error getting CloudRegion %s", instance.Request.CloudRegion)
                return nil
@@ -1245,7 +1256,7 @@ func (v *InstanceClient) RecoverCreateOrDelete(ctx context.Context, id string) e
                                return
                        }
 
-                       err = k8sClient.deleteResources(helm.GetReverseK8sResources(instance.Resources), instance.Namespace)
+                       err = k8sClient.deleteResources(ctx, helm.GetReverseK8sResources(instance.Resources), instance.Namespace)
                        if err != nil {
                                log.Printf("  Error running deleting instance resources, error: %s", err)
                                return
@@ -1322,13 +1333,13 @@ func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *H
                                GVK:  h.KRT.GVK,
                                Name: h.Hook.Name,
                        }
-                       if _, err := k8sClient.GetResourceStatus(res, hookClient.kubeNameSpace); err == nil {
+                       if _, err := k8sClient.GetResourceStatus(context.TODO(), res, hookClient.kubeNameSpace); err == nil {
                                remainHookRss = append(remainHookRss, res)
                                log.Printf("  Rss %s will be deleted.", res.Name)
                        }
                }
                if len(remainHookRss) > 0 {
-                       err = k8sClient.deleteResources(remainHookRss, hookClient.kubeNameSpace)
+                       err = k8sClient.deleteResources(context.TODO(), remainHookRss, hookClient.kubeNameSpace)
                        if err != nil {
                                log.Printf("Error cleaning Hook Rss, please do it manually if needed. Error: %s", err.Error())
                        }
index 6a1b351..2c1b546 100644 (file)
@@ -19,6 +19,8 @@
 package app
 
 import (
+       "context"
+
        pkgerrors "github.com/pkg/errors"
 )
 
@@ -55,14 +57,14 @@ func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labe
        //Read the status from the DD
 
        k8sClient := KubernetesClient{}
-       err := k8sClient.Init(cloudRegion, "dummy") //we don't care about instance id in this request
+       err := k8sClient.Init(context.TODO(), cloudRegion, "dummy") //we don't care about instance id in this request
        if err != nil {
                return QueryStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
        }
 
        var resourcesStatus []ResourceStatus
        if name != "" {
-               resList, err := k8sClient.queryResources(apiVersion, kind, labels, namespace)
+               resList, err := k8sClient.queryResources(context.TODO(), apiVersion, kind, labels, namespace)
                if err != nil {
                        return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources")
                }
@@ -79,7 +81,7 @@ func (v *QueryClient) Query(namespace, cloudRegion, apiVersion, kind, name, labe
                        resourcesStatus = resList
                }
        } else {
-               resList, err := k8sClient.queryResources(apiVersion, kind, labels, namespace)
+               resList, err := k8sClient.queryResources(context.TODO(), apiVersion, kind, labels, namespace)
                if err != nil {
                        return QueryStatus{}, pkgerrors.Wrap(err, "Querying Resources")
                }
index aa66033..48ecc00 100644 (file)
@@ -394,7 +394,8 @@ func (iss *InstanceStatusSubClient) refreshWatchers(instanceId, subId string) er
        })
        v := NewInstanceClient()
        k8sClient := KubernetesClient{}
-       instance, err := v.Get(context.TODO(), instanceId)
+       ctx := context.TODO()
+       instance, err := v.Get(ctx, instanceId)
        if err != nil {
                return pkgerrors.Wrap(err, "Cannot get instance for notify thread")
        }
@@ -403,7 +404,7 @@ func (iss *InstanceStatusSubClient) refreshWatchers(instanceId, subId string) er
        if err != nil {
                return pkgerrors.Wrap(err, "Unable to find Profile instance status")
        }
-       err = k8sClient.Init(instance.Request.CloudRegion, instanceId)
+       err = k8sClient.Init(ctx, instance.Request.CloudRegion, instanceId)
        if err != nil {
                return pkgerrors.Wrap(err, "Cannot set k8s client for instance")
        }
index 9bc1a87..49d8f1e 100644 (file)
@@ -109,13 +109,14 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err
 
        //Determine Cloud Region and namespace
        v := app.NewInstanceClient()
-       instance, err := v.Get(context.TODO(), instanceId)
+       ctx := context.TODO()
+       instance, err := v.Get(ctx, instanceId)
        if err != nil {
                return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Getting instance")
        }
 
        k8sClient := app.KubernetesClient{}
-       err = k8sClient.Init(instance.Request.CloudRegion, instanceId)
+       err = k8sClient.Init(ctx, instance.Request.CloudRegion, instanceId)
        if err != nil {
                return InstanceMiniHCStatus{}, pkgerrors.Wrap(err, "Preparing KubeClient")
        }
@@ -153,7 +154,7 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err
 
        for _, h := range hooks {
                h.Status.StartedAt = time.Now()
-               kr, err := k8sClient.CreateKind(h.Definition.KRT, instance.Namespace)
+               kr, err := k8sClient.CreateKind(ctx, h.Definition.KRT, instance.Namespace)
                if err != nil {
                        // Set status fields
                        h.Status.Status = release.HookPhaseFailed
@@ -163,7 +164,7 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err
                        retErr := "Starting hook " + h.Status.Name
 
                        // Dump to DB
-                       err = db.DBconn.Create(context.TODO(), ihc.storeName, key, ihc.tagInst, ihcs)
+                       err = db.DBconn.Create(ctx, ihc.storeName, key, ihc.tagInst, ihcs)
                        if err != nil {
                                retErr = retErr + " and couldn't save to DB"
                        }
@@ -175,7 +176,7 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err
                        h.Status.KR = kr
                }
        }
-       err = db.DBconn.Create(context.TODO(), ihc.storeName, key, ihc.tagInst, ihcs)
+       err = db.DBconn.Create(ctx, ihc.storeName, key, ihc.tagInst, ihcs)
        if err != nil {
                return instanceMiniHCStatusFromStatus(ihcs),
                        pkgerrors.Wrap(err, "Creating Instance DB Entry")
@@ -222,7 +223,7 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err
                                        "Reason":        map[bool]string{true: "Hook finished", false: "All hooks finished"}[b],
                                })
                                if b { //Some hook finished - need to update DB
-                                       err = db.DBconn.Update(context.TODO(), ihc.storeName, key, ihc.tagInst, ihcs)
+                                       err = db.DBconn.Update(ctx, ihc.storeName, key, ihc.tagInst, ihcs)
                                        if err != nil {
                                                log.Error("Couldn't update database", log.Fields{
                                                        "Store":   ihc.storeName,
@@ -249,7 +250,7 @@ func (ihc InstanceHCClient) Create(instanceId string) (InstanceMiniHCStatus, err
                                                }
                                        }
                                        ihcs.Status = finalResult
-                                       err = db.DBconn.Update(context.TODO(), ihc.storeName, key, ihc.tagInst, ihcs)
+                                       err = db.DBconn.Update(ctx, ihc.storeName, key, ihc.tagInst, ihcs)
                                        if err != nil {
                                                log.Error("Couldn't update database", log.Fields{
                                                        "Store":   ihc.storeName,
@@ -295,7 +296,7 @@ func (ihc InstanceHCClient) Delete(instanceId, healthcheckId string) error {
                return pkgerrors.Wrap(err, "Error querying Healthcheck status")
        }
        k8sClient := app.KubernetesClient{}
-       err = k8sClient.Init(instance.Request.CloudRegion, instanceId)
+       err = k8sClient.Init(context.TODO(), instance.Request.CloudRegion, instanceId)
        if err != nil {
                return pkgerrors.Wrap(err, "Preparing KubeClient")
        }
index 0b1be9a..f62043b 100644 (file)
@@ -14,6 +14,7 @@ limitations under the License.
 package healthcheck
 
 import (
+       "context"
        "time"
 
        "github.com/onap/multicloud-k8s/src/k8splugin/internal/app"
@@ -67,7 +68,7 @@ func getHookState(hookStatus HookStatus, k8sClient app.KubernetesClient, namespa
        }
 
        for {
-               res, err := k8sClient.GetResourceStatus(hookStatus.KR, namespace)
+               res, err := k8sClient.GetResourceStatus(context.TODO(), hookStatus.KR, namespace)
                if err != nil {
                        log.Error("Unable to check Resource Status", log.Fields{
                                "Resource":  hookStatus.KR,