2 Copyright 2018 Intel Corporation.
3 Copyright © 2021 Samsung Electronics
4 Copyright © 2021 Orange
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
26 "github.com/onap/multicloud-k8s/src/k8splugin/internal/config"
27 "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection"
28 "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
29 log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
30 "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin"
32 pkgerrors "github.com/pkg/errors"
33 "k8s.io/apimachinery/pkg/api/meta"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36 "k8s.io/apimachinery/pkg/runtime"
37 "k8s.io/apimachinery/pkg/runtime/schema"
38 "k8s.io/client-go/discovery/cached/disk"
39 "k8s.io/client-go/dynamic"
40 "k8s.io/client-go/kubernetes"
41 "k8s.io/client-go/rest"
42 "k8s.io/client-go/restmapper"
43 "k8s.io/client-go/tools/clientcmd"
46 // KubernetesClient encapsulates the different clients' interfaces
47 // we need when interacting with a Kubernetes cluster
48 type KubernetesClient struct {
49 rawConfig clientcmd.ClientConfig
50 restConfig *rest.Config
51 clientSet kubernetes.Interface
52 dynamicClient dynamic.Interface
53 discoverClient *disk.CachedDiscoveryClient
54 restMapper meta.RESTMapper
58 // ResourceStatus holds Resource Runtime Data
59 type ResourceStatus struct {
60 Name string `json:"name"`
61 GVK schema.GroupVersionKind `json:"GVK"`
62 Status unstructured.Unstructured `json:"status"`
65 // getPodsByLabel yields status of all pods under given instance ID
66 func (k *KubernetesClient) getPodsByLabel(namespace string) ([]ResourceStatus, error) {
67 client := k.GetStandardClient().CoreV1().Pods(namespace)
68 listOpts := metav1.ListOptions{
69 LabelSelector: config.GetConfiguration().KubernetesLabelName + "=" + k.instanceID,
71 podList, err := client.List(context.TODO(), listOpts)
73 return nil, pkgerrors.Wrap(err, "Retrieving PodList from cluster")
75 resp := make([]ResourceStatus, 0, len(podList.Items))
76 cumulatedErrorMsg := make([]string, 0)
77 for _, pod := range podList.Items {
78 podContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod)
80 cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
83 var unstrPod unstructured.Unstructured
84 unstrPod.SetUnstructuredContent(podContent)
85 podStatus := ResourceStatus{
86 Name: unstrPod.GetName(),
87 GVK: schema.FromAPIVersionAndKind("v1", "Pod"),
90 resp = append(resp, podStatus)
92 if len(cumulatedErrorMsg) != 0 {
93 return resp, pkgerrors.New("Converting podContent to unstruct error:\n" +
94 strings.Join(cumulatedErrorMsg, "\n"))
99 func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, namespace string) ([]ResourceStatus, error) {
100 dynClient := k.GetDynamicClient()
101 mapper := k.GetMapper()
102 gvk := schema.FromAPIVersionAndKind(apiVersion, kind)
103 mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
105 return nil, pkgerrors.Wrap(err, "Preparing mapper based on GVK")
108 gvr := mapping.Resource
109 opts := metav1.ListOptions{
110 LabelSelector: labelSelector,
112 var unstrList *unstructured.UnstructuredList
113 switch mapping.Scope.Name() {
114 case meta.RESTScopeNameNamespace:
115 unstrList, err = dynClient.Resource(gvr).Namespace(namespace).List(context.TODO(), opts)
116 case meta.RESTScopeNameRoot:
117 unstrList, err = dynClient.Resource(gvr).List(context.TODO(), opts)
119 return nil, pkgerrors.New("Got an unknown RESTScopeName for mapping: " + gvk.String())
122 return nil, pkgerrors.Wrap(err, "Querying for resources")
125 resp := make([]ResourceStatus, 0)
126 for _, unstr := range unstrList.Items {
127 if unstr.GetName() != "" {
128 resp = append(resp, ResourceStatus{unstr.GetName(), gvk, unstr})
134 // GetResourcesStatus yields status of given generic resource
135 func (k *KubernetesClient) GetResourceStatus(res helm.KubernetesResource, namespace string) (ResourceStatus, error) {
136 dynClient := k.GetDynamicClient()
137 mapper := k.GetMapper()
138 mapping, err := mapper.RESTMapping(schema.GroupKind{
139 Group: res.GVK.Group,
143 return ResourceStatus{},
144 pkgerrors.Wrap(err, "Preparing mapper based on GVK")
147 gvr := mapping.Resource
148 opts := metav1.GetOptions{}
149 var unstruct *unstructured.Unstructured
150 switch mapping.Scope.Name() {
151 case meta.RESTScopeNameNamespace:
152 unstruct, err = dynClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), res.Name, opts)
153 case meta.RESTScopeNameRoot:
154 unstruct, err = dynClient.Resource(gvr).Get(context.TODO(), res.Name, opts)
156 return ResourceStatus{}, pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + res.GVK.String())
160 return ResourceStatus{}, pkgerrors.Wrap(err, "Getting object status")
163 return ResourceStatus{unstruct.GetName(), res.GVK, *unstruct}, nil
166 // getKubeConfig uses the connectivity client to get the kubeconfig based on the name
167 // of the cloudregion. This is written out to a file.
168 func (k *KubernetesClient) getKubeConfig(cloudregion string) (string, error) {
170 conn := connection.NewConnectionClient()
171 kubeConfigPath, err := conn.Download(cloudregion)
173 return "", pkgerrors.Wrap(err, "Downloading kubeconfig")
176 return kubeConfigPath, nil
179 // Init loads the Kubernetes configuation values stored into the local configuration file
180 func (k *KubernetesClient) Init(cloudregion string, iid string) error {
181 if cloudregion == "" {
182 return pkgerrors.New("Cloudregion is empty")
186 return pkgerrors.New("Instance ID is empty")
191 configPath, err := k.getKubeConfig(cloudregion)
193 return pkgerrors.Wrap(err, "Get kubeconfig file")
196 //Remove kubeconfigfile after the clients are created
197 defer os.Remove(configPath)
199 config, err := clientcmd.BuildConfigFromFlags("", configPath)
201 return pkgerrors.Wrap(err, "setConfig: Build config from flags raised an error")
204 k.clientSet, err = kubernetes.NewForConfig(config)
209 k.dynamicClient, err = dynamic.NewForConfig(config)
211 return pkgerrors.Wrap(err, "Creating dynamic client")
214 k.discoverClient, err = disk.NewCachedDiscoveryClientForConfig(config, os.TempDir(), "", 10*time.Minute)
216 return pkgerrors.Wrap(err, "Creating discovery client")
219 k.restMapper = restmapper.NewDeferredDiscoveryRESTMapper(k.discoverClient)
220 k.restConfig = config
223 kubeFile, err := os.Open(configPath)
225 return pkgerrors.Wrap(err, "Opening kubeConfig")
227 kubeData, err := ioutil.ReadAll(kubeFile)
229 return pkgerrors.Wrap(err, "Reading kubeConfig")
231 k.rawConfig, err = clientcmd.NewClientConfigFromBytes(kubeData)
233 return pkgerrors.Wrap(err, "Creating rawConfig")
239 func (k *KubernetesClient) ensureNamespace(namespace string) error {
241 pluginImpl, err := plugin.GetPluginByKind("Namespace")
243 return pkgerrors.Wrap(err, "Loading Namespace Plugin")
246 ns, err := pluginImpl.Get(helm.KubernetesResource{
248 GVK: schema.GroupVersionKind{
255 // Check for errors getting the namespace while ignoring errors where the namespace does not exist
256 // Error message when namespace does not exist: "namespaces "namespace-name" not found"
257 if err != nil && strings.Contains(err.Error(), "not found") == false {
258 log.Error("Error checking for namespace", log.Fields{
260 "namespace": namespace,
262 return pkgerrors.Wrap(err, "Error checking for namespace: "+namespace)
266 log.Info("Creating Namespace", log.Fields{
267 "namespace": namespace,
270 _, err = pluginImpl.Create("", namespace, k)
272 log.Error("Error Creating Namespace", log.Fields{
274 "namespace": namespace,
276 return pkgerrors.Wrap(err, "Error creating "+namespace+" namespace")
282 func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate,
283 namespace string) (helm.KubernetesResource, error) {
285 if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
286 return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists")
289 log.Info("Processing Kubernetes Resource", log.Fields{
290 "filepath": resTempl.FilePath,
293 pluginImpl, err := plugin.GetPluginByKind(resTempl.GVK.Kind)
295 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error loading plugin")
298 createdResourceName, err := pluginImpl.Create(resTempl.FilePath, namespace, k)
300 log.Error("Error Creating Resource", log.Fields{
303 "filepath": resTempl.FilePath,
305 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error in plugin "+resTempl.GVK.Kind+" plugin")
308 log.Info("Created Kubernetes Resource", log.Fields{
309 "resource": createdResourceName,
313 return helm.KubernetesResource{
315 Name: createdResourceName,
319 func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate,
320 namespace string) (helm.KubernetesResource, error) {
322 if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
323 return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists")
326 log.Info("Processing Kubernetes Resource", log.Fields{
327 "filepath": resTempl.FilePath,
330 pluginImpl, err := plugin.GetPluginByKind(resTempl.GVK.Kind)
332 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error loading plugin")
335 updatedResourceName, err := pluginImpl.Update(resTempl.FilePath, namespace, k)
337 log.Error("Error Updating Resource", log.Fields{
340 "filepath": resTempl.FilePath,
342 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error in plugin "+resTempl.GVK.Kind+" plugin")
345 log.Info("Updated Kubernetes Resource", log.Fields{
346 "resource": updatedResourceName,
350 return helm.KubernetesResource{
352 Name: updatedResourceName,
356 func (k *KubernetesClient) createResources(sortedTemplates []helm.KubernetesResourceTemplate,
357 namespace string) ([]helm.KubernetesResource, error) {
359 err := k.ensureNamespace(namespace)
361 return nil, pkgerrors.Wrap(err, "Creating Namespace")
364 var createdResources []helm.KubernetesResource
365 for _, resTempl := range sortedTemplates {
366 resCreated, err := k.CreateKind(resTempl, namespace)
368 return nil, pkgerrors.Wrapf(err, "Error creating kind: %+v", resTempl.GVK)
370 createdResources = append(createdResources, resCreated)
373 return createdResources, nil
376 func (k *KubernetesClient) updateResources(sortedTemplates []helm.KubernetesResourceTemplate,
377 namespace string) ([]helm.KubernetesResource, error) {
379 err := k.ensureNamespace(namespace)
381 return nil, pkgerrors.Wrap(err, "Creating Namespace")
384 var updatedResources []helm.KubernetesResource
385 for _, resTempl := range sortedTemplates {
386 resUpdated, err := k.updateKind(resTempl, namespace)
388 return nil, pkgerrors.Wrapf(err, "Error updating kind: %+v", resTempl.GVK)
390 updatedResources = append(updatedResources, resUpdated)
393 return updatedResources, nil
396 func (k *KubernetesClient) DeleteKind(resource helm.KubernetesResource, namespace string) error {
397 log.Warn("Deleting Resource", log.Fields{
399 "resource": resource.Name,
402 pluginImpl, err := plugin.GetPluginByKind(resource.GVK.Kind)
404 return pkgerrors.Wrap(err, "Error loading plugin")
407 err = pluginImpl.Delete(resource, namespace, k)
409 return pkgerrors.Wrap(err, "Error deleting "+resource.Name)
415 func (k *KubernetesClient) deleteResources(resources []helm.KubernetesResource, namespace string) error {
416 //TODO: Investigate if deletion should be in a particular order
417 for _, res := range resources {
418 err := k.DeleteKind(res, namespace)
420 return pkgerrors.Wrap(err, "Deleting resources")
427 //GetMapper returns the RESTMapper that was created for this client
428 func (k *KubernetesClient) GetMapper() meta.RESTMapper {
432 //GetDynamicClient returns the dynamic client that is needed for
433 //unstructured REST calls to the apiserver
434 func (k *KubernetesClient) GetDynamicClient() dynamic.Interface {
435 return k.dynamicClient
438 // GetStandardClient returns the standard client that can be used to handle
439 // standard kubernetes kinds
440 func (k *KubernetesClient) GetStandardClient() kubernetes.Interface {
444 //GetInstanceID returns the instanceID that is injected into all the
445 //resources created by the plugin
446 func (k *KubernetesClient) GetInstanceID() string {