4c5f7e1c1ec4d6ba804350ee99e43afc2f93148d
[multicloud/k8s.git] / src / k8splugin / internal / app / client.go
1 /*
2 Copyright 2018 Intel Corporation.
3 Copyright © 2021 Samsung Electronics
4 Copyright © 2021 Orange
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9     http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package app
18
19 import (
20         "context"
21         "io/ioutil"
22         "os"
23         "strings"
24         "time"
25
26         "github.com/onap/multicloud-k8s/src/k8splugin/internal/config"
27         "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection"
28         "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
29         log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
30         "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin"
31
32         pkgerrors "github.com/pkg/errors"
33         "k8s.io/apimachinery/pkg/api/meta"
34         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35         "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36         "k8s.io/apimachinery/pkg/runtime"
37         "k8s.io/apimachinery/pkg/runtime/schema"
38         "k8s.io/client-go/discovery/cached/disk"
39         "k8s.io/client-go/dynamic"
40         "k8s.io/client-go/kubernetes"
41         "k8s.io/client-go/rest"
42         "k8s.io/client-go/restmapper"
43         "k8s.io/client-go/tools/clientcmd"
44 )
45
46 // KubernetesClient encapsulates the different clients' interfaces
47 // we need when interacting with a Kubernetes cluster
48 type KubernetesClient struct {
49         rawConfig      clientcmd.ClientConfig
50         restConfig     *rest.Config
51         clientSet      kubernetes.Interface
52         dynamicClient  dynamic.Interface
53         discoverClient *disk.CachedDiscoveryClient
54         restMapper     meta.RESTMapper
55         instanceID     string
56 }
57
58 // ResourceStatus holds Resource Runtime Data
59 type ResourceStatus struct {
60         Name   string                    `json:"name"`
61         GVK    schema.GroupVersionKind   `json:"GVK"`
62         Status unstructured.Unstructured `json:"status"`
63 }
64
65 // getPodsByLabel yields status of all pods under given instance ID
66 func (k *KubernetesClient) getPodsByLabel(namespace string) ([]ResourceStatus, error) {
67         client := k.GetStandardClient().CoreV1().Pods(namespace)
68         listOpts := metav1.ListOptions{
69                 LabelSelector: config.GetConfiguration().KubernetesLabelName + "=" + k.instanceID,
70         }
71         podList, err := client.List(context.TODO(), listOpts)
72         if err != nil {
73                 return nil, pkgerrors.Wrap(err, "Retrieving PodList from cluster")
74         }
75         resp := make([]ResourceStatus, 0, len(podList.Items))
76         cumulatedErrorMsg := make([]string, 0)
77         for _, pod := range podList.Items {
78                 podContent, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod)
79                 if err != nil {
80                         cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
81                         continue
82                 }
83                 var unstrPod unstructured.Unstructured
84                 unstrPod.SetUnstructuredContent(podContent)
85                 podStatus := ResourceStatus{
86                         Name:   unstrPod.GetName(),
87                         GVK:    schema.FromAPIVersionAndKind("v1", "Pod"),
88                         Status: unstrPod,
89                 }
90                 resp = append(resp, podStatus)
91         }
92         if len(cumulatedErrorMsg) != 0 {
93                 return resp, pkgerrors.New("Converting podContent to unstruct error:\n" +
94                         strings.Join(cumulatedErrorMsg, "\n"))
95         }
96         return resp, nil
97 }
98
99 func (k *KubernetesClient) queryResources(apiVersion, kind, labelSelector, namespace string) ([]ResourceStatus, error) {
100         dynClient := k.GetDynamicClient()
101         mapper := k.GetMapper()
102         gvk := schema.FromAPIVersionAndKind(apiVersion, kind)
103         mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
104         if err != nil {
105                 return nil, pkgerrors.Wrap(err, "Preparing mapper based on GVK")
106         }
107
108         gvr := mapping.Resource
109         opts := metav1.ListOptions{
110                 LabelSelector: labelSelector,
111         }
112         var unstrList *unstructured.UnstructuredList
113         switch mapping.Scope.Name() {
114         case meta.RESTScopeNameNamespace:
115                 unstrList, err = dynClient.Resource(gvr).Namespace(namespace).List(context.TODO(), opts)
116         case meta.RESTScopeNameRoot:
117                 unstrList, err = dynClient.Resource(gvr).List(context.TODO(), opts)
118         default:
119                 return nil, pkgerrors.New("Got an unknown RESTScopeName for mapping: " + gvk.String())
120         }
121         if err != nil {
122                 return nil, pkgerrors.Wrap(err, "Querying for resources")
123         }
124
125         resp := make([]ResourceStatus, 0)
126         for _, unstr := range unstrList.Items {
127                 if unstr.GetName() != "" {
128                         resp = append(resp, ResourceStatus{unstr.GetName(), gvk, unstr})
129                 }
130         }
131         return resp, nil
132 }
133
134 // GetResourcesStatus yields status of given generic resource
135 func (k *KubernetesClient) GetResourceStatus(res helm.KubernetesResource, namespace string) (ResourceStatus, error) {
136         dynClient := k.GetDynamicClient()
137         mapper := k.GetMapper()
138         mapping, err := mapper.RESTMapping(schema.GroupKind{
139                 Group: res.GVK.Group,
140                 Kind:  res.GVK.Kind,
141         }, res.GVK.Version)
142         if err != nil {
143                 return ResourceStatus{},
144                         pkgerrors.Wrap(err, "Preparing mapper based on GVK")
145         }
146
147         gvr := mapping.Resource
148         opts := metav1.GetOptions{}
149         var unstruct *unstructured.Unstructured
150         switch mapping.Scope.Name() {
151         case meta.RESTScopeNameNamespace:
152                 unstruct, err = dynClient.Resource(gvr).Namespace(namespace).Get(context.TODO(), res.Name, opts)
153         case meta.RESTScopeNameRoot:
154                 unstruct, err = dynClient.Resource(gvr).Get(context.TODO(), res.Name, opts)
155         default:
156                 return ResourceStatus{}, pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + res.GVK.String())
157         }
158
159         if err != nil {
160                 return ResourceStatus{}, pkgerrors.Wrap(err, "Getting object status")
161         }
162
163         return ResourceStatus{unstruct.GetName(), res.GVK, *unstruct}, nil
164 }
165
166 // getKubeConfig uses the connectivity client to get the kubeconfig based on the name
167 // of the cloudregion. This is written out to a file.
168 func (k *KubernetesClient) getKubeConfig(cloudregion string) (string, error) {
169
170         conn := connection.NewConnectionClient()
171         kubeConfigPath, err := conn.Download(cloudregion)
172         if err != nil {
173                 return "", pkgerrors.Wrap(err, "Downloading kubeconfig")
174         }
175
176         return kubeConfigPath, nil
177 }
178
179 // Init loads the Kubernetes configuation values stored into the local configuration file
180 func (k *KubernetesClient) Init(cloudregion string, iid string) error {
181         if cloudregion == "" {
182                 return pkgerrors.New("Cloudregion is empty")
183         }
184
185         if iid == "" {
186                 return pkgerrors.New("Instance ID is empty")
187         }
188
189         k.instanceID = iid
190
191         configPath, err := k.getKubeConfig(cloudregion)
192         if err != nil {
193                 return pkgerrors.Wrap(err, "Get kubeconfig file")
194         }
195
196         //Remove kubeconfigfile after the clients are created
197         defer os.Remove(configPath)
198
199         config, err := clientcmd.BuildConfigFromFlags("", configPath)
200         if err != nil {
201                 return pkgerrors.Wrap(err, "setConfig: Build config from flags raised an error")
202         }
203
204         k.clientSet, err = kubernetes.NewForConfig(config)
205         if err != nil {
206                 return err
207         }
208
209         k.dynamicClient, err = dynamic.NewForConfig(config)
210         if err != nil {
211                 return pkgerrors.Wrap(err, "Creating dynamic client")
212         }
213
214         k.discoverClient, err = disk.NewCachedDiscoveryClientForConfig(config, os.TempDir(), "", 10*time.Minute)
215         if err != nil {
216                 return pkgerrors.Wrap(err, "Creating discovery client")
217         }
218
219         k.restMapper = restmapper.NewDeferredDiscoveryRESTMapper(k.discoverClient)
220         k.restConfig = config
221
222         //Spawn ClientConfig
223         kubeFile, err := os.Open(configPath)
224         if err != nil {
225                 return pkgerrors.Wrap(err, "Opening kubeConfig")
226         }
227         kubeData, err := ioutil.ReadAll(kubeFile)
228         if err != nil {
229                 return pkgerrors.Wrap(err, "Reading kubeConfig")
230         }
231         k.rawConfig, err = clientcmd.NewClientConfigFromBytes(kubeData)
232         if err != nil {
233                 return pkgerrors.Wrap(err, "Creating rawConfig")
234         }
235
236         return nil
237 }
238
239 func (k *KubernetesClient) ensureNamespace(namespace string) error {
240
241         pluginImpl, err := plugin.GetPluginByKind("Namespace")
242         if err != nil {
243                 return pkgerrors.Wrap(err, "Loading Namespace Plugin")
244         }
245
246         ns, err := pluginImpl.Get(helm.KubernetesResource{
247                 Name: namespace,
248                 GVK: schema.GroupVersionKind{
249                         Group:   "",
250                         Version: "v1",
251                         Kind:    "Namespace",
252                 },
253         }, namespace, k)
254
255         // Check for errors getting the namespace while ignoring errors where the namespace does not exist
256         // Error message when namespace does not exist: "namespaces "namespace-name" not found"
257         if err != nil && strings.Contains(err.Error(), "not found") == false {
258                 log.Error("Error checking for namespace", log.Fields{
259                         "error":     err,
260                         "namespace": namespace,
261                 })
262                 return pkgerrors.Wrap(err, "Error checking for namespace: "+namespace)
263         }
264
265         if ns == "" {
266                 log.Info("Creating Namespace", log.Fields{
267                         "namespace": namespace,
268                 })
269
270                 _, err = pluginImpl.Create("", namespace, k)
271                 if err != nil {
272                         log.Error("Error Creating Namespace", log.Fields{
273                                 "error":     err,
274                                 "namespace": namespace,
275                         })
276                         return pkgerrors.Wrap(err, "Error creating "+namespace+" namespace")
277                 }
278         }
279         return nil
280 }
281
282 func (k *KubernetesClient) CreateKind(resTempl helm.KubernetesResourceTemplate,
283         namespace string) (helm.KubernetesResource, error) {
284
285         if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
286                 return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists")
287         }
288
289         log.Info("Processing Kubernetes Resource", log.Fields{
290                 "filepath": resTempl.FilePath,
291         })
292
293         pluginImpl, err := plugin.GetPluginByKind(resTempl.GVK.Kind)
294         if err != nil {
295                 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error loading plugin")
296         }
297
298         createdResourceName, err := pluginImpl.Create(resTempl.FilePath, namespace, k)
299         if err != nil {
300                 log.Error("Error Creating Resource", log.Fields{
301                         "error":    err,
302                         "gvk":      resTempl.GVK,
303                         "filepath": resTempl.FilePath,
304                 })
305                 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error in plugin "+resTempl.GVK.Kind+" plugin")
306         }
307
308         log.Info("Created Kubernetes Resource", log.Fields{
309                 "resource": createdResourceName,
310                 "gvk":      resTempl.GVK,
311         })
312
313         return helm.KubernetesResource{
314                 GVK:  resTempl.GVK,
315                 Name: createdResourceName,
316         }, nil
317 }
318
319 func (k *KubernetesClient) updateKind(resTempl helm.KubernetesResourceTemplate,
320         namespace string) (helm.KubernetesResource, error) {
321
322         if _, err := os.Stat(resTempl.FilePath); os.IsNotExist(err) {
323                 return helm.KubernetesResource{}, pkgerrors.New("File " + resTempl.FilePath + "does not exists")
324         }
325
326         log.Info("Processing Kubernetes Resource", log.Fields{
327                 "filepath": resTempl.FilePath,
328         })
329
330         pluginImpl, err := plugin.GetPluginByKind(resTempl.GVK.Kind)
331         if err != nil {
332                 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error loading plugin")
333         }
334
335         updatedResourceName, err := pluginImpl.Update(resTempl.FilePath, namespace, k)
336         if err != nil {
337                 log.Error("Error Updating Resource", log.Fields{
338                         "error":    err,
339                         "gvk":      resTempl.GVK,
340                         "filepath": resTempl.FilePath,
341                 })
342                 return helm.KubernetesResource{}, pkgerrors.Wrap(err, "Error in plugin "+resTempl.GVK.Kind+" plugin")
343         }
344
345         log.Info("Updated Kubernetes Resource", log.Fields{
346                 "resource": updatedResourceName,
347                 "gvk":      resTempl.GVK,
348         })
349
350         return helm.KubernetesResource{
351                 GVK:  resTempl.GVK,
352                 Name: updatedResourceName,
353         }, nil
354 }
355
356 func (k *KubernetesClient) createResources(sortedTemplates []helm.KubernetesResourceTemplate,
357         namespace string) ([]helm.KubernetesResource, error) {
358
359         err := k.ensureNamespace(namespace)
360         if err != nil {
361                 return nil, pkgerrors.Wrap(err, "Creating Namespace")
362         }
363
364         var createdResources []helm.KubernetesResource
365         for _, resTempl := range sortedTemplates {
366                 resCreated, err := k.CreateKind(resTempl, namespace)
367                 if err != nil {
368                         return nil, pkgerrors.Wrapf(err, "Error creating kind: %+v", resTempl.GVK)
369                 }
370                 createdResources = append(createdResources, resCreated)
371         }
372
373         return createdResources, nil
374 }
375
376 func (k *KubernetesClient) updateResources(sortedTemplates []helm.KubernetesResourceTemplate,
377         namespace string) ([]helm.KubernetesResource, error) {
378
379         err := k.ensureNamespace(namespace)
380         if err != nil {
381                 return nil, pkgerrors.Wrap(err, "Creating Namespace")
382         }
383
384         var updatedResources []helm.KubernetesResource
385         for _, resTempl := range sortedTemplates {
386                 resUpdated, err := k.updateKind(resTempl, namespace)
387                 if err != nil {
388                         return nil, pkgerrors.Wrapf(err, "Error updating kind: %+v", resTempl.GVK)
389                 }
390                 updatedResources = append(updatedResources, resUpdated)
391         }
392
393         return updatedResources, nil
394 }
395
396 func (k *KubernetesClient) DeleteKind(resource helm.KubernetesResource, namespace string) error {
397         log.Warn("Deleting Resource", log.Fields{
398                 "gvk":      resource.GVK,
399                 "resource": resource.Name,
400         })
401
402         pluginImpl, err := plugin.GetPluginByKind(resource.GVK.Kind)
403         if err != nil {
404                 return pkgerrors.Wrap(err, "Error loading plugin")
405         }
406
407         err = pluginImpl.Delete(resource, namespace, k)
408         if err != nil {
409                 return pkgerrors.Wrap(err, "Error deleting "+resource.Name)
410         }
411
412         return nil
413 }
414
415 func (k *KubernetesClient) deleteResources(resources []helm.KubernetesResource, namespace string) error {
416         //TODO: Investigate if deletion should be in a particular order
417         for _, res := range resources {
418                 err := k.DeleteKind(res, namespace)
419                 if err != nil {
420                         return pkgerrors.Wrap(err, "Deleting resources")
421                 }
422         }
423
424         return nil
425 }
426
427 //GetMapper returns the RESTMapper that was created for this client
428 func (k *KubernetesClient) GetMapper() meta.RESTMapper {
429         return k.restMapper
430 }
431
432 //GetDynamicClient returns the dynamic client that is needed for
433 //unstructured REST calls to the apiserver
434 func (k *KubernetesClient) GetDynamicClient() dynamic.Interface {
435         return k.dynamicClient
436 }
437
438 // GetStandardClient returns the standard client that can be used to handle
439 // standard kubernetes kinds
440 func (k *KubernetesClient) GetStandardClient() kubernetes.Interface {
441         return k.clientSet
442 }
443
444 //GetInstanceID returns the instanceID that is injected into all the
445 //resources created by the plugin
446 func (k *KubernetesClient) GetInstanceID() string {
447         return k.instanceID
448 }