Update status check endpoint 
[multicloud/k8s.git] / src / k8splugin / internal / app / client.go
1 /*
2 Copyright 2018 Intel Corporation.
3 Copyright © 2021 Samsung Electronics
4 Copyright © 2021 Orange
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9     http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package app
18
19 import (
20         "context"
21         "io/ioutil"
22         "os"
23         "strings"
24         "time"
25
26         "github.com/onap/multicloud-k8s/src/k8splugin/internal/config"
27         "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection"
28         "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
29         log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
30         "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin"
31
32         pkgerrors "github.com/pkg/errors"
33         "k8s.io/apimachinery/pkg/api/meta"
34         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35         "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36         "k8s.io/apimachinery/pkg/runtime"
37         "k8s.io/apimachinery/pkg/runtime/schema"
38         "k8s.io/client-go/discovery/cached/disk"
39         "k8s.io/client-go/dynamic"
40         "k8s.io/client-go/kubernetes"
41         "k8s.io/client-go/rest"
42         "k8s.io/client-go/restmapper"
43         "k8s.io/client-go/tools/clientcmd"
44 )
45
46 // KubernetesClient encapsulates the different clients' interfaces
47 // we need when interacting with a Kubernetes cluster
48 type KubernetesClient struct {
49         rawConfig      clientcmd.ClientConfig
50         restConfig     *rest.Config
51         clientSet      kubernetes.Interface
52         dynamicClient  dynamic.Interface
53         discoverClient *disk.CachedDiscoveryClient
54         restMapper     meta.RESTMapper
55         instanceID     string
56 }
57
58 // ResourceStatus holds Resource Runtime Data
59 type ResourceStatus struct {
60         Name   string                    `json:"name"`
61         GVK    schema.GroupVersionKind   `json:"GVK"`
62         Status unstructured.Unstructured `json:"status"`
63 }
64
65 func (k *KubernetesClient) getRestApi(apiVersion string) (rest.Interface, error) {
66         //based on kubectl api-versions
67         switch apiVersion {
68         case "admissionregistration.k8s.io/v1":
69                 return k.clientSet.AdmissionregistrationV1().RESTClient(), nil
70         case "admissionregistration.k8s.io/v1beta1":
71                 return k.clientSet.AdmissionregistrationV1beta1().RESTClient(), nil
72         case "apps/v1":
73                 return k.clientSet.AppsV1().RESTClient(), nil
74         case "apps/v1beta1":
75                 return k.clientSet.AppsV1beta1().RESTClient(), nil
76         case "apps/v1beta2":
77                 return k.clientSet.AppsV1beta2().RESTClient(), nil
78         case "authentication.k8s.io/v1":
79                 return k.clientSet.AuthenticationV1().RESTClient(), nil
80         case "authentication.k8s.io/v1beta1":
81                 return k.clientSet.AuthenticationV1beta1().RESTClient(), nil
82         case "authorization.k8s.io/v1":
83                 return k.clientSet.AuthorizationV1().RESTClient(), nil
84         case "authorization.k8s.io/v1beta1":
85                 return k.clientSet.AuthorizationV1beta1().RESTClient(), nil
86         case "autoscaling/v1":
87                 return k.clientSet.AutoscalingV1().RESTClient(), nil
88         case "autoscaling/v2beta1":
89                 return k.clientSet.AutoscalingV2beta1().RESTClient(), nil
90         case "autoscaling/v2beta2":
91                 return k.clientSet.AutoscalingV2beta2().RESTClient(), nil
92         case "batch/v1":
93                 return k.clientSet.BatchV1().RESTClient(), nil
94         case "batch/v1beta1":
95                 return k.clientSet.BatchV1beta1().RESTClient(), nil
96         case "certificates.k8s.io/v1":
97                 return k.clientSet.CertificatesV1().RESTClient(), nil
98         case "certificates.k8s.io/v1beta1":
99                 return k.clientSet.CertificatesV1beta1().RESTClient(), nil
100         case "coordination.k8s.io/v1":
101                 return k.clientSet.CoordinationV1().RESTClient(), nil
102         case "coordination.k8s.io/v1beta1":
103                 return k.clientSet.CoordinationV1beta1().RESTClient(), nil
104         case "v1":
105                 return k.clientSet.CoreV1().RESTClient(), nil
106         case "discovery.k8s.io/v1beta1":
107                 return k.clientSet.DiscoveryV1beta1().RESTClient(), nil
108         case "events.k8s.io/v1":
109                 return k.clientSet.EventsV1().RESTClient(), nil
110         case "events.k8s.io/v1beta1":
111                 return k.clientSet.EventsV1beta1().RESTClient(), nil
112         case "extensions/v1beta1":
113                 return k.clientSet.ExtensionsV1beta1().RESTClient(), nil
114         case "flowcontrol.apiserver.k8s.io/v1alpha1":
115                 return k.clientSet.FlowcontrolV1alpha1().RESTClient(), nil
116         case "networking.k8s.io/v1":
117                 return k.clientSet.NetworkingV1().RESTClient(), nil
118         case "networking.k8s.io/v1beta1":
119                 return k.clientSet.NetworkingV1beta1().RESTClient(), nil
120         case "node.k8s.io/v1alpha1":
121                 return k.clientSet.NodeV1alpha1().RESTClient(), nil
122         case "node.k8s.io/v1beta1":
123                 return k.clientSet.NodeV1beta1().RESTClient(), nil
124         case "policy/v1beta1":
125                 return k.clientSet.PolicyV1beta1().RESTClient(), nil
126         case "rbac.authorization.k8s.io/v1":
127                 return k.clientSet.RbacV1().RESTClient(), nil
128         case "rbac.authorization.k8s.io/v1alpha1":
129                 return k.clientSet.RbacV1alpha1().RESTClient(), nil
130         case "rbac.authorization.k8s.io/v1beta1":
131                 return k.clientSet.RbacV1beta1().RESTClient(), nil
132         case "scheduling.k8s.io/v1":
133                 return k.clientSet.SchedulingV1().RESTClient(), nil
134         case "scheduling.k8s.io/v1alpha1":
135                 return k.clientSet.SchedulingV1alpha1().RESTClient(), nil
136         case "scheduling.k8s.io/v1beta1":
137                 return k.clientSet.SchedulingV1beta1().RESTClient(), nil
138         case "storage.k8s.io/v1":
139                 return k.clientSet.StorageV1().RESTClient(), nil
140         case "storage.k8s.io/v1alpha1":
141                 return k.clientSet.StorageV1alpha1().RESTClient(), nil
142         case "storage.k8s.io/v1beta1":
143                 return k.clientSet.StorageV1beta1().RESTClient(), nil
144         default:
145                 return nil, pkgerrors.New("Api version " + apiVersion + " unknown")
146         }
147 }
148
149 // getPodsByLabel yields status of all pods under given instance ID
150 func (k *KubernetesClient) getPodsByLabel(namespace string) ([]ResourceStatus, error) {
151         client := k.GetStandardClient().CoreV1().Pods(namespace)
152         listOpts := metav1.ListOptions{
153                 LabelSelector: config.GetConfiguration().KubernetesLabelName + "=" + k.instanceID,
154         }
155         podList, err := client.List(context.TODO(), listOpts)
156         if err != nil {
157                 return nil, pkgerrors.Wrap(err, "Retrieving PodList from cluster")
158         }
159         resp := make([]ResourceStatus, 0, len(podList.Items))
160         cumulatedErrorMsg := make([]string, 0)
161         for _, pod := range podList.Items {
162                 podContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod)
163                 if err != nil {
164                         cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
165                         continue
166                 }
167                 var unstrPod unstructured.Unstructured
168                 unstrPod.SetUnstructuredContent(podContent)
169                 podStatus := ResourceStatus{
170                         Name:   unstrPod.GetName(),
171                         GVK:    schema.FromAPIVersionAndKind("v1", "Pod"),
172                         Status: unstrPod,
173                 }
174                 resp = append(resp, podStatus)
175         }
176         if len(cumulatedErrorMsg) != 0 {
177                 return resp, pkgerrors.New("Converting podContent to unstruct error:\n" +
178                         strings.Join(cumulatedErrorMsg, "\n"))
179         }
180         return resp, nil
181 }
182
183 func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, namespace string) ([]ResourceStatus, error) {
184         dynClient := k.GetDynamicClient()
185         mapper := k.GetMapper()
186         gvk := schema.FromAPIVersionAndKind(apiVersion, kind)
187         mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
188         if err != nil {
189                 return nil, pkgerrors.Wrap(err, "Preparing mapper based on GVK")
190         }
191
192         gvr := mapping.Resource
193         opts := metav1.ListOptions{
194                 LabelSelector: labelSelector,
195         }
196         var unstrList *unstructured.UnstructuredList
197         switch mapping.Scope.Name() {
198         case meta.RESTScopeNameNamespace:
199                 unstrList, err = dynClient.Resource(gvr).Namespace(namespace).List(context.TODO(), opts)
200         case meta.RESTScopeNameRoot:
201                 unstrList, err = dynClient.Resource(gvr).List(context.TODO(), opts)
202         default:
203                 return nil, pkgerrors.New("Got an unknown RESTScopeName for mapping: " + gvk.String())
204         }
205         if err != nil {
206                 return nil, pkgerrors.Wrap(err, "Querying for resources")
207         }
208
209         resp := make([]ResourceStatus, 0)
210         for _, unstr := range unstrList.Items {
211                 if unstr.GetName() != "" {
212                         resp = append(resp, ResourceStatus{unstr.GetName(), gvk, unstr})
213                 }
214         }
215         return resp, nil
216 }
217
218 // GetResourcesStatus yields status of given generic resource
219 func (k *KubernetesClient) GetResourceStatus(res helm.KubernetesResource, namespace string) (ResourceStatus, error) {
220         dynClient := k.GetDynamicClient()
221         mapper := k.GetMapper()
222         mapping, err := mapper.RESTMapping(schema.GroupKind{
223                 Group: res.GVK.Group,
224                 Kind:  res.GVK.Kind,
225         }, res.GVK.Version)
226         if err != nil {
227                 return ResourceStatus{},
228                         pkgerrors.Wrap(err, "Preparing mapper based on GVK")
229         }
230
231         gvr := mapping.Resource
232         opts := metav1.GetOptions{}
233         var unstruct *unstructured.Unstructured
234         switch mapping.Scope.Name() {
235         case meta.RESTScopeNameNamespace:
236                 unstruct, err = dynClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), res.Name, opts)
237         case meta.RESTScopeNameRoot:
238                 unstruct, err = dynClient.Resource(gvr).Get(context.TODO(), res.Name, opts)
239         default:
240                 return ResourceStatus{}, pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + res.GVK.String())
241         }
242
243         if err != nil {
244                 return ResourceStatus{}, pkgerrors.Wrap(err, "Getting object status")
245         }
246
247         return ResourceStatus{unstruct.GetName(), res.GVK, *unstruct}, nil
248 }
249
250 // getKubeConfig uses the connectivity client to get the kubeconfig based on the name
251 // of the cloudregion. This is written out to a file.
252 func (k *KubernetesClient) getKubeConfig(cloudregion string) (string, error) {
253
254         conn := connection.NewConnectionClient()
255         kubeConfigPath, err := conn.Download(cloudregion)
256         if err != nil {
257                 return "", pkgerrors.Wrap(err, "Downloading kubeconfig")
258         }
259
260         return kubeConfigPath, nil
261 }
262
263 // Init loads the Kubernetes configuation values stored into the local configuration file
264 func (k *KubernetesClient) Init(cloudregion string, iid string) error {
265         if cloudregion == "" {
266                 return pkgerrors.New("Cloudregion is empty")
267         }
268
269         if iid == "" {
270                 return pkgerrors.New("Instance ID is empty")
271         }
272
273         k.instanceID = iid
274
275         configPath, err := k.getKubeConfig(cloudregion)
276         if err != nil {
277                 return pkgerrors.Wrap(err, "Get kubeconfig file")
278         }
279
280         //Remove kubeconfigfile after the clients are created
281         defer os.Remove(configPath)
282
283         config, err := clientcmd.BuildConfigFromFlags("", configPath)
284         if err != nil {
285                 return pkgerrors.Wrap(err, "setConfig: Build config from flags raised an error")
286         }
287
288         k.clientSet, err = kubernetes.NewForConfig(config)
289         if err != nil {
290                 return err
291         }
292
293         k.dynamicClient, err = dynamic.NewForConfig(config)
294         if err != nil {
295                 return pkgerrors.Wrap(err, "Creating dynamic client")
296         }
297
298         k.discoverClient, err = disk.NewCachedDiscoveryClientForConfig(config, os.TempDir(), "", 10*time.Minute)
299         if err != nil {
300                 return pkgerrors.Wrap(err, "Creating discovery client")
301         }
302
303         k.restMapper = restmapper.NewDeferredDiscoveryRESTMapper(k.discoverClient)
304         k.restConfig = config
305
306         //Spawn ClientConfig
307         kubeFile, err := os.Open(configPath)
308         if err != nil {
309                 return pkgerrors.Wrap(err, "Opening kubeConfig")
310         }
311         kubeData, err := ioutil.ReadAll(kubeFile)
312         if err != nil {
313                 return pkgerrors.Wrap(err, "Reading kubeConfig")
314         }
315         k.rawConfig, err = clientcmd.NewClientConfigFromBytes(kubeData)
316         if err != nil {
317                 return pkgerrors.Wrap(err, "Creating rawConfig")
318         }
319
320         return nil
321 }
322
323 func (k *KubernetesClient) ensureNamespace(namespace string) error {
324
325         pluginImpl, err := plugin.GetPluginByKind("Namespace")
326         if err != nil {
327                 return pkgerrors.Wrap(err, "Loading Namespace Plugin")
328         }
329
330         ns, err := pluginImpl.Get(helm.KubernetesResource{
331                 Name: namespace,
332                 GVK: schema.GroupVersionKind{
333                         Group:   "",
334                         Version: "v1",
335                         Kind:    "Namespace",
336                 },
337         }, namespace, k)
338
339         // Check for errors getting the namespace while ignoring errors where the namespace does not exist
340         // Error message when namespace does not exist: "namespaces "namespace-name" not found"
341         if err != nil && strings.Contains(err.Error(), "not found") == false {
342                 log.Error("Error checking for namespace", log.Fields{
343                         "error":     err,
344                         "namespace": namespace,
345                 })
346                 return pkgerrors.Wrap(err, "Error checking for namespace: "+namespace)
347         }
348
349         if ns == "" {
350                 log.Info("Creating Namespace", log.Fields{
351                         "namespace": namespace,
352                 })
353
354                 _, err = pluginImpl.Create("", namespace, k)
355                 if err != nil {
356                         log.Error("Error Creating Namespace", log.Fields{
357                                 "error":     err,
358                                 "namespace": namespace,
359                         })
360                         return pkgerrors.Wrap(err, "Error creating "+namespace+" namespace")
361                 }
362         }
363         return nil
364 }
365
366 func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate,
367         namespace string) (helm.KubernetesResource, error) {
368
369         if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
370                 return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists")
371         }
372
373         log.Info("Processing Kubernetes Resource", log.Fields{
374                 "filepath": resTempl.FilePath,
375         })
376
377         pluginImpl, err := plugin.GetPluginByKind(resTempl.GVK.Kind)
378         if err != nil {
379                 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error loading plugin")
380         }
381
382         createdResourceName, err := pluginImpl.Create(resTempl.FilePath, namespace, k)
383         if err != nil {
384                 log.Error("Error Creating Resource", log.Fields{
385                         "error":    err,
386                         "gvk":      resTempl.GVK,
387                         "filepath": resTempl.FilePath,
388                 })
389                 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error in plugin "+resTempl.GVK.Kind+" plugin")
390         }
391
392         log.Info("Created Kubernetes Resource", log.Fields{
393                 "resource": createdResourceName,
394                 "gvk":      resTempl.GVK,
395         })
396
397         return helm.KubernetesResource{
398                 GVK:  resTempl.GVK,
399                 Name: createdResourceName,
400         }, nil
401 }
402
403 func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate,
404         namespace string) (helm.KubernetesResource, error) {
405
406         if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
407                 return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists")
408         }
409
410         log.Info("Processing Kubernetes Resource", log.Fields{
411                 "filepath": resTempl.FilePath,
412         })
413
414         pluginImpl, err := plugin.GetPluginByKind(resTempl.GVK.Kind)
415         if err != nil {
416                 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error loading plugin")
417         }
418
419         updatedResourceName, err := pluginImpl.Update(resTempl.FilePath, namespace, k)
420         if err != nil {
421                 log.Error("Error Updating Resource", log.Fields{
422                         "error":    err,
423                         "gvk":      resTempl.GVK,
424                         "filepath": resTempl.FilePath,
425                 })
426                 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error in plugin "+resTempl.GVK.Kind+" plugin")
427         }
428
429         log.Info("Updated Kubernetes Resource", log.Fields{
430                 "resource": updatedResourceName,
431                 "gvk":      resTempl.GVK,
432         })
433
434         return helm.KubernetesResource{
435                 GVK:  resTempl.GVK,
436                 Name: updatedResourceName,
437         }, nil
438 }
439
440 func (k *KubernetesClient) createResources(sortedTemplates []helm.KubernetesResourceTemplate,
441         namespace string) ([]helm.KubernetesResource, error) {
442
443         err := k.ensureNamespace(namespace)
444         if err != nil {
445                 return nil, pkgerrors.Wrap(err, "Creating Namespace")
446         }
447
448         var createdResources []helm.KubernetesResource
449         for _, resTempl := range sortedTemplates {
450                 resCreated, err := k.CreateKind(resTempl, namespace)
451                 if err != nil {
452                         return nil, pkgerrors.Wrapf(err, "Error creating kind: %+v", resTempl.GVK)
453                 }
454                 createdResources = append(createdResources, resCreated)
455         }
456
457         return createdResources, nil
458 }
459
460 func (k *KubernetesClient) updateResources(sortedTemplates []helm.KubernetesResourceTemplate,
461         namespace string) ([]helm.KubernetesResource, error) {
462
463         err := k.ensureNamespace(namespace)
464         if err != nil {
465                 return nil, pkgerrors.Wrap(err, "Creating Namespace")
466         }
467
468         var updatedResources []helm.KubernetesResource
469         for _, resTempl := range sortedTemplates {
470                 resUpdated, err := k.updateKind(resTempl, namespace)
471                 if err != nil {
472                         return nil, pkgerrors.Wrapf(err, "Error updating kind: %+v", resTempl.GVK)
473                 }
474                 updatedResources = append(updatedResources, resUpdated)
475         }
476
477         return updatedResources, nil
478 }
479
480 func (k *KubernetesClient) DeleteKind(resource helm.KubernetesResource, namespace string) error {
481         log.Warn("Deleting Resource", log.Fields{
482                 "gvk":      resource.GVK,
483                 "resource": resource.Name,
484         })
485
486         pluginImpl, err := plugin.GetPluginByKind(resource.GVK.Kind)
487         if err != nil {
488                 return pkgerrors.Wrap(err, "Error loading plugin")
489         }
490
491         err = pluginImpl.Delete(resource, namespace, k)
492         if err != nil {
493                 return pkgerrors.Wrap(err, "Error deleting "+resource.Name)
494         }
495
496         return nil
497 }
498
499 func (k *KubernetesClient) deleteResources(resources []helm.KubernetesResource, namespace string) error {
500         //TODO: Investigate if deletion should be in a particular order
501         for _, res := range resources {
502                 err := k.DeleteKind(res, namespace)
503                 if err != nil {
504                         return pkgerrors.Wrap(err, "Deleting resources")
505                 }
506         }
507
508         return nil
509 }
510
511 //GetMapper returns the RESTMapper that was created for this client
512 func (k *KubernetesClient) GetMapper() meta.RESTMapper {
513         return k.restMapper
514 }
515
516 //GetDynamicClient returns the dynamic client that is needed for
517 //unstructured REST calls to the apiserver
518 func (k *KubernetesClient) GetDynamicClient() dynamic.Interface {
519         return k.dynamicClient
520 }
521
522 // GetStandardClient returns the standard client that can be used to handle
523 // standard kubernetes kinds
524 func (k *KubernetesClient) GetStandardClient() kubernetes.Interface {
525         return k.clientSet
526 }
527
528 //GetInstanceID returns the instanceID that is injected into all the
529 //resources created by the plugin
530 func (k *KubernetesClient) GetInstanceID() string {
531         return k.instanceID
532 }