2 Copyright 2018 Intel Corporation.
3 Copyright © 2021 Samsung Electronics
4 Copyright © 2021 Orange
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
26 "github.com/onap/multicloud-k8s/src/k8splugin/internal/config"
27 "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection"
28 "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
29 log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
30 "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin"
32 pkgerrors "github.com/pkg/errors"
33 "k8s.io/apimachinery/pkg/api/meta"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36 "k8s.io/apimachinery/pkg/runtime"
37 "k8s.io/apimachinery/pkg/runtime/schema"
38 "k8s.io/client-go/discovery/cached/disk"
39 "k8s.io/client-go/dynamic"
40 "k8s.io/client-go/kubernetes"
41 "k8s.io/client-go/rest"
42 "k8s.io/client-go/restmapper"
43 "k8s.io/client-go/tools/clientcmd"
46 // KubernetesClient encapsulates the different clients' interfaces
47 // we need when interacting with a Kubernetes cluster
48 type KubernetesClient struct {
49 rawConfig clientcmd.ClientConfig
50 restConfig *rest.Config
51 clientSet kubernetes.Interface
52 dynamicClient dynamic.Interface
53 discoverClient *disk.CachedDiscoveryClient
54 restMapper meta.RESTMapper
58 // ResourceStatus holds Resource Runtime Data
59 type ResourceStatus struct {
60 Name string `json:"name"`
61 GVK schema.GroupVersionKind `json:"GVK"`
62 Status unstructured.Unstructured `json:"status"`
65 func (k *KubernetesClient) getRestApi(apiVersion string) (rest.Interface, error) {
66 //based on kubectl api-versions
68 case "admissionregistration.k8s.io/v1":
69 return k.clientSet.AdmissionregistrationV1().RESTClient(), nil
70 case "admissionregistration.k8s.io/v1beta1":
71 return k.clientSet.AdmissionregistrationV1beta1().RESTClient(), nil
73 return k.clientSet.AppsV1().RESTClient(), nil
75 return k.clientSet.AppsV1beta1().RESTClient(), nil
77 return k.clientSet.AppsV1beta2().RESTClient(), nil
78 case "authentication.k8s.io/v1":
79 return k.clientSet.AuthenticationV1().RESTClient(), nil
80 case "authentication.k8s.io/v1beta1":
81 return k.clientSet.AuthenticationV1beta1().RESTClient(), nil
82 case "authorization.k8s.io/v1":
83 return k.clientSet.AuthorizationV1().RESTClient(), nil
84 case "authorization.k8s.io/v1beta1":
85 return k.clientSet.AuthorizationV1beta1().RESTClient(), nil
86 case "autoscaling/v1":
87 return k.clientSet.AutoscalingV1().RESTClient(), nil
88 case "autoscaling/v2beta1":
89 return k.clientSet.AutoscalingV2beta1().RESTClient(), nil
90 case "autoscaling/v2beta2":
91 return k.clientSet.AutoscalingV2beta2().RESTClient(), nil
93 return k.clientSet.BatchV1().RESTClient(), nil
95 return k.clientSet.BatchV1beta1().RESTClient(), nil
96 case "certificates.k8s.io/v1":
97 return k.clientSet.CertificatesV1().RESTClient(), nil
98 case "certificates.k8s.io/v1beta1":
99 return k.clientSet.CertificatesV1beta1().RESTClient(), nil
100 case "coordination.k8s.io/v1":
101 return k.clientSet.CoordinationV1().RESTClient(), nil
102 case "coordination.k8s.io/v1beta1":
103 return k.clientSet.CoordinationV1beta1().RESTClient(), nil
105 return k.clientSet.CoreV1().RESTClient(), nil
106 case "discovery.k8s.io/v1beta1":
107 return k.clientSet.DiscoveryV1beta1().RESTClient(), nil
108 case "events.k8s.io/v1":
109 return k.clientSet.EventsV1().RESTClient(), nil
110 case "events.k8s.io/v1beta1":
111 return k.clientSet.EventsV1beta1().RESTClient(), nil
112 case "extensions/v1beta1":
113 return k.clientSet.ExtensionsV1beta1().RESTClient(), nil
114 case "flowcontrol.apiserver.k8s.io/v1alpha1":
115 return k.clientSet.FlowcontrolV1alpha1().RESTClient(), nil
116 case "networking.k8s.io/v1":
117 return k.clientSet.NetworkingV1().RESTClient(), nil
118 case "networking.k8s.io/v1beta1":
119 return k.clientSet.NetworkingV1beta1().RESTClient(), nil
120 case "node.k8s.io/v1alpha1":
121 return k.clientSet.NodeV1alpha1().RESTClient(), nil
122 case "node.k8s.io/v1beta1":
123 return k.clientSet.NodeV1beta1().RESTClient(), nil
124 case "policy/v1beta1":
125 return k.clientSet.PolicyV1beta1().RESTClient(), nil
126 case "rbac.authorization.k8s.io/v1":
127 return k.clientSet.RbacV1().RESTClient(), nil
128 case "rbac.authorization.k8s.io/v1alpha1":
129 return k.clientSet.RbacV1alpha1().RESTClient(), nil
130 case "rbac.authorization.k8s.io/v1beta1":
131 return k.clientSet.RbacV1beta1().RESTClient(), nil
132 case "scheduling.k8s.io/v1":
133 return k.clientSet.SchedulingV1().RESTClient(), nil
134 case "scheduling.k8s.io/v1alpha1":
135 return k.clientSet.SchedulingV1alpha1().RESTClient(), nil
136 case "scheduling.k8s.io/v1beta1":
137 return k.clientSet.SchedulingV1beta1().RESTClient(), nil
138 case "storage.k8s.io/v1":
139 return k.clientSet.StorageV1().RESTClient(), nil
140 case "storage.k8s.io/v1alpha1":
141 return k.clientSet.StorageV1alpha1().RESTClient(), nil
142 case "storage.k8s.io/v1beta1":
143 return k.clientSet.StorageV1beta1().RESTClient(), nil
145 return nil, pkgerrors.New("Api version " + apiVersion + " unknown")
149 // getPodsByLabel yields status of all pods under given instance ID
150 func (k *KubernetesClient) getPodsByLabel(namespace string) ([]ResourceStatus, error) {
151 client := k.GetStandardClient().CoreV1().Pods(namespace)
152 listOpts := metav1.ListOptions{
153 LabelSelector: config.GetConfiguration().KubernetesLabelName + "=" + k.instanceID,
155 podList, err := client.List(context.TODO(), listOpts)
157 return nil, pkgerrors.Wrap(err, "Retrieving PodList from cluster")
159 resp := make([]ResourceStatus, 0, len(podList.Items))
160 cumulatedErrorMsg := make([]string, 0)
161 for _, pod := range podList.Items {
162 podContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod)
164 cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
167 var unstrPod unstructured.Unstructured
168 unstrPod.SetUnstructuredContent(podContent)
169 podStatus := ResourceStatus{
170 Name: unstrPod.GetName(),
171 GVK: schema.FromAPIVersionAndKind("v1", "Pod"),
174 resp = append(resp, podStatus)
176 if len(cumulatedErrorMsg) != 0 {
177 return resp, pkgerrors.New("Converting podContent to unstruct error:\n" +
178 strings.Join(cumulatedErrorMsg, "\n"))
183 func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, namespace string) ([]ResourceStatus, error) {
184 dynClient := k.GetDynamicClient()
185 mapper := k.GetMapper()
186 gvk := schema.FromAPIVersionAndKind(apiVersion, kind)
187 mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
189 return nil, pkgerrors.Wrap(err, "Preparing mapper based on GVK")
192 gvr := mapping.Resource
193 opts := metav1.ListOptions{
194 LabelSelector: labelSelector,
196 var unstrList *unstructured.UnstructuredList
197 switch mapping.Scope.Name() {
198 case meta.RESTScopeNameNamespace:
199 unstrList, err = dynClient.Resource(gvr).Namespace(namespace).List(context.TODO(), opts)
200 case meta.RESTScopeNameRoot:
201 unstrList, err = dynClient.Resource(gvr).List(context.TODO(), opts)
203 return nil, pkgerrors.New("Got an unknown RESTScopeName for mapping: " + gvk.String())
206 return nil, pkgerrors.Wrap(err, "Querying for resources")
209 resp := make([]ResourceStatus, 0)
210 for _, unstr := range unstrList.Items {
211 if unstr.GetName() != "" {
212 resp = append(resp, ResourceStatus{unstr.GetName(), gvk, unstr})
218 // GetResourcesStatus yields status of given generic resource
219 func (k *KubernetesClient) GetResourceStatus(res helm.KubernetesResource, namespace string) (ResourceStatus, error) {
220 dynClient := k.GetDynamicClient()
221 mapper := k.GetMapper()
222 mapping, err := mapper.RESTMapping(schema.GroupKind{
223 Group: res.GVK.Group,
227 return ResourceStatus{},
228 pkgerrors.Wrap(err, "Preparing mapper based on GVK")
231 gvr := mapping.Resource
232 opts := metav1.GetOptions{}
233 var unstruct *unstructured.Unstructured
234 switch mapping.Scope.Name() {
235 case meta.RESTScopeNameNamespace:
236 unstruct, err = dynClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), res.Name, opts)
237 case meta.RESTScopeNameRoot:
238 unstruct, err = dynClient.Resource(gvr).Get(context.TODO(), res.Name, opts)
240 return ResourceStatus{}, pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + res.GVK.String())
244 return ResourceStatus{}, pkgerrors.Wrap(err, "Getting object status")
247 return ResourceStatus{unstruct.GetName(), res.GVK, *unstruct}, nil
250 // getKubeConfig uses the connectivity client to get the kubeconfig based on the name
251 // of the cloudregion. This is written out to a file.
252 func (k *KubernetesClient) getKubeConfig(cloudregion string) (string, error) {
254 conn := connection.NewConnectionClient()
255 kubeConfigPath, err := conn.Download(cloudregion)
257 return "", pkgerrors.Wrap(err, "Downloading kubeconfig")
260 return kubeConfigPath, nil
263 // Init loads the Kubernetes configuation values stored into the local configuration file
264 func (k *KubernetesClient) Init(cloudregion string, iid string) error {
265 if cloudregion == "" {
266 return pkgerrors.New("Cloudregion is empty")
270 return pkgerrors.New("Instance ID is empty")
275 configPath, err := k.getKubeConfig(cloudregion)
277 return pkgerrors.Wrap(err, "Get kubeconfig file")
280 //Remove kubeconfigfile after the clients are created
281 defer os.Remove(configPath)
283 config, err := clientcmd.BuildConfigFromFlags("", configPath)
285 return pkgerrors.Wrap(err, "setConfig: Build config from flags raised an error")
288 k.clientSet, err = kubernetes.NewForConfig(config)
293 k.dynamicClient, err = dynamic.NewForConfig(config)
295 return pkgerrors.Wrap(err, "Creating dynamic client")
298 k.discoverClient, err = disk.NewCachedDiscoveryClientForConfig(config, os.TempDir(), "", 10*time.Minute)
300 return pkgerrors.Wrap(err, "Creating discovery client")
303 k.restMapper = restmapper.NewDeferredDiscoveryRESTMapper(k.discoverClient)
304 k.restConfig = config
307 kubeFile, err := os.Open(configPath)
309 return pkgerrors.Wrap(err, "Opening kubeConfig")
311 kubeData, err := ioutil.ReadAll(kubeFile)
313 return pkgerrors.Wrap(err, "Reading kubeConfig")
315 k.rawConfig, err = clientcmd.NewClientConfigFromBytes(kubeData)
317 return pkgerrors.Wrap(err, "Creating rawConfig")
323 func (k *KubernetesClient) ensureNamespace(namespace string) error {
325 pluginImpl, err := plugin.GetPluginByKind("Namespace")
327 return pkgerrors.Wrap(err, "Loading Namespace Plugin")
330 ns, err := pluginImpl.Get(helm.KubernetesResource{
332 GVK: schema.GroupVersionKind{
339 // Check for errors getting the namespace while ignoring errors where the namespace does not exist
340 // Error message when namespace does not exist: "namespaces "namespace-name" not found"
341 if err != nil && strings.Contains(err.Error(), "not found") == false {
342 log.Error("Error checking for namespace", log.Fields{
344 "namespace": namespace,
346 return pkgerrors.Wrap(err, "Error checking for namespace: "+namespace)
350 log.Info("Creating Namespace", log.Fields{
351 "namespace": namespace,
354 _, err = pluginImpl.Create("", namespace, k)
356 log.Error("Error Creating Namespace", log.Fields{
358 "namespace": namespace,
360 return pkgerrors.Wrap(err, "Error creating "+namespace+" namespace")
366 func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate,
367 namespace string) (helm.KubernetesResource, error) {
369 if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
370 return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists")
373 log.Info("Processing Kubernetes Resource", log.Fields{
374 "filepath": resTempl.FilePath,
377 pluginImpl, err := plugin.GetPluginByKind(resTempl.GVK.Kind)
379 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error loading plugin")
382 createdResourceName, err := pluginImpl.Create(resTempl.FilePath, namespace, k)
384 log.Error("Error Creating Resource", log.Fields{
387 "filepath": resTempl.FilePath,
389 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error in plugin "+resTempl.GVK.Kind+" plugin")
392 log.Info("Created Kubernetes Resource", log.Fields{
393 "resource": createdResourceName,
397 return helm.KubernetesResource{
399 Name: createdResourceName,
403 func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate,
404 namespace string) (helm.KubernetesResource, error) {
406 if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
407 return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists")
410 log.Info("Processing Kubernetes Resource", log.Fields{
411 "filepath": resTempl.FilePath,
414 pluginImpl, err := plugin.GetPluginByKind(resTempl.GVK.Kind)
416 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error loading plugin")
419 updatedResourceName, err := pluginImpl.Update(resTempl.FilePath, namespace, k)
421 log.Error("Error Updating Resource", log.Fields{
424 "filepath": resTempl.FilePath,
426 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error in plugin "+resTempl.GVK.Kind+" plugin")
429 log.Info("Updated Kubernetes Resource", log.Fields{
430 "resource": updatedResourceName,
434 return helm.KubernetesResource{
436 Name: updatedResourceName,
440 func (k *KubernetesClient) createResources(sortedTemplates []helm.KubernetesResourceTemplate,
441 namespace string) ([]helm.KubernetesResource, error) {
443 err := k.ensureNamespace(namespace)
445 return nil, pkgerrors.Wrap(err, "Creating Namespace")
448 var createdResources []helm.KubernetesResource
449 for _, resTempl := range sortedTemplates {
450 resCreated, err := k.CreateKind(resTempl, namespace)
452 return nil, pkgerrors.Wrapf(err, "Error creating kind: %+v", resTempl.GVK)
454 createdResources = append(createdResources, resCreated)
457 return createdResources, nil
460 func (k *KubernetesClient) updateResources(sortedTemplates []helm.KubernetesResourceTemplate,
461 namespace string) ([]helm.KubernetesResource, error) {
463 err := k.ensureNamespace(namespace)
465 return nil, pkgerrors.Wrap(err, "Creating Namespace")
468 var updatedResources []helm.KubernetesResource
469 for _, resTempl := range sortedTemplates {
470 resUpdated, err := k.updateKind(resTempl, namespace)
472 return nil, pkgerrors.Wrapf(err, "Error updating kind: %+v", resTempl.GVK)
474 updatedResources = append(updatedResources, resUpdated)
477 return updatedResources, nil
480 func (k *KubernetesClient) DeleteKind(resource helm.KubernetesResource, namespace string) error {
481 log.Warn("Deleting Resource", log.Fields{
483 "resource": resource.Name,
486 pluginImpl, err := plugin.GetPluginByKind(resource.GVK.Kind)
488 return pkgerrors.Wrap(err, "Error loading plugin")
491 err = pluginImpl.Delete(resource, namespace, k)
493 return pkgerrors.Wrap(err, "Error deleting "+resource.Name)
499 func (k *KubernetesClient) deleteResources(resources []helm.KubernetesResource, namespace string) error {
500 //TODO: Investigate if deletion should be in a particular order
501 for _, res := range resources {
502 err := k.DeleteKind(res, namespace)
504 return pkgerrors.Wrap(err, "Deleting resources")
511 //GetMapper returns the RESTMapper that was created for this client
512 func (k *KubernetesClient) GetMapper() meta.RESTMapper {
516 //GetDynamicClient returns the dynamic client that is needed for
517 //unstructured REST calls to the apiserver
518 func (k *KubernetesClient) GetDynamicClient() dynamic.Interface {
519 return k.dynamicClient
522 // GetStandardClient returns the standard client that can be used to handle
523 // standard kubernetes kinds
524 func (k *KubernetesClient) GetStandardClient() kubernetes.Interface {
528 //GetInstanceID returns the instanceID that is injected into all the
529 //resources created by the plugin
530 func (k *KubernetesClient) GetInstanceID() string {