Update status check endpoint 
[multicloud/k8s.git] / src / k8splugin / internal / app / instance.go
1 /*
2  * Copyright 2018 Intel Corporation, Inc
3  * Copyright © 2021 Samsung Electronics
4  * Copyright © 2021 Orange
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 package app
20
21 import (
22         "context"
23         "encoding/json"
24
25         appsv1 "k8s.io/api/apps/v1"
26         batchv1 "k8s.io/api/batch/v1"
27         corev1 "k8s.io/api/core/v1"
28         apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
29         "k8s.io/apimachinery/pkg/runtime"
30         "k8s.io/apimachinery/pkg/runtime/schema"
31         "k8s.io/cli-runtime/pkg/resource"
32         "log"
33         "strings"
34         "time"
35
36         "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
37         "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
38         "github.com/onap/multicloud-k8s/src/k8splugin/internal/namegenerator"
39         "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb"
40         "github.com/onap/multicloud-k8s/src/k8splugin/internal/statuscheck"
41
42         pkgerrors "github.com/pkg/errors"
43 )
44
45 // InstanceRequest contains the parameters needed for instantiation
46 // of profiles
47 type InstanceRequest struct {
48         RBName         string            `json:"rb-name"`
49         RBVersion      string            `json:"rb-version"`
50         ProfileName    string            `json:"profile-name"`
51         ReleaseName    string            `json:"release-name"`
52         CloudRegion    string            `json:"cloud-region"`
53         Labels         map[string]string `json:"labels"`
54         OverrideValues map[string]string `json:"override-values"`
55 }
56
57 // InstanceResponse contains the response from instantiation
58 type InstanceResponse struct {
59         ID          string                    `json:"id"`
60         Request     InstanceRequest           `json:"request"`
61         Namespace   string                    `json:"namespace"`
62         ReleaseName string                    `json:"release-name"`
63         Resources   []helm.KubernetesResource `json:"resources"`
64         Hooks       []*helm.Hook              `json:"-"`
65 }
66
67 // InstanceMiniResponse contains the response from instantiation
68 // It does NOT include the created resources.
69 // Use the regular GET to get the created resources for a particular instance
70 type InstanceMiniResponse struct {
71         ID          string          `json:"id"`
72         Request     InstanceRequest `json:"request"`
73         ReleaseName string          `json:"release-name"`
74         Namespace   string          `json:"namespace"`
75 }
76
77 // InstanceStatus is what is returned when status is queried for an instance
78 type InstanceStatus struct {
79         Request         InstanceRequest  `json:"request"`
80         Ready           bool             `json:"ready"`
81         ResourceCount   int32            `json:"resourceCount"`
82         ResourcesStatus []ResourceStatus `json:"resourcesStatus"`
83 }
84
85 // InstanceManager is an interface exposes the instantiation functionality
86 type InstanceManager interface {
87         Create(i InstanceRequest) (InstanceResponse, error)
88         Get(id string) (InstanceResponse, error)
89         Status(id string) (InstanceStatus, error)
90         Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error)
91         List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error)
92         Find(rbName string, ver string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error)
93         Delete(id string) error
94 }
95
96 // InstanceKey is used as the primary key in the db
97 type InstanceKey struct {
98         ID string `json:"id"`
99 }
100
101 // We will use json marshalling to convert to string to
102 // preserve the underlying structure.
103 func (dk InstanceKey) String() string {
104         out, err := json.Marshal(dk)
105         if err != nil {
106                 return ""
107         }
108
109         return string(out)
110 }
111
112 // InstanceClient implements the InstanceManager interface
113 // It will also be used to maintain some localized state
114 type InstanceClient struct {
115         storeName string
116         tagInst   string
117 }
118
119 // NewInstanceClient returns an instance of the InstanceClient
120 // which implements the InstanceManager
121 func NewInstanceClient() *InstanceClient {
122         return &InstanceClient{
123                 storeName: "rbdef",
124                 tagInst:   "instance",
125         }
126 }
127
128 // Simplified function to retrieve model data from instance ID
129 func resolveModelFromInstance(instanceID string) (rbName, rbVersion, profileName, releaseName string, err error) {
130         v := NewInstanceClient()
131         resp, err := v.Get(instanceID)
132         if err != nil {
133                 return "", "", "", "", pkgerrors.Wrap(err, "Getting instance")
134         }
135         return resp.Request.RBName, resp.Request.RBVersion, resp.Request.ProfileName, resp.ReleaseName, nil
136 }
137
138 // Create an instance of rb on the cluster  in the database
139 func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) {
140
141         // Name is required
142         if i.RBName == "" || i.RBVersion == "" || i.ProfileName == "" || i.CloudRegion == "" {
143                 return InstanceResponse{},
144                         pkgerrors.New("RBName, RBversion, ProfileName, CloudRegion are required to create a new instance")
145         }
146
147         //Check if profile exists
148         profile, err := rb.NewProfileClient().Get(i.RBName, i.RBVersion, i.ProfileName)
149         if err != nil {
150                 return InstanceResponse{}, pkgerrors.New("Unable to find Profile to create instance")
151         }
152
153         //Convert override values from map to array of strings of the following format
154         //foo=bar
155         overrideValues := []string{}
156         if i.OverrideValues != nil {
157                 for k, v := range i.OverrideValues {
158                         overrideValues = append(overrideValues, k+"="+v)
159                 }
160         }
161
162         //Execute the kubernetes create command
163         sortedTemplates, hookList, releaseName, err := rb.NewProfileClient().Resolve(i.RBName, i.RBVersion, i.ProfileName, overrideValues, i.ReleaseName)
164         if err != nil {
165                 return InstanceResponse{}, pkgerrors.Wrap(err, "Error resolving helm charts")
166         }
167
168         // TODO: Only generate if id is not provided
169         id := namegenerator.Generate()
170
171         k8sClient := KubernetesClient{}
172         err = k8sClient.Init(i.CloudRegion, id)
173         if err != nil {
174                 return InstanceResponse{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
175         }
176
177         createdResources, err := k8sClient.createResources(sortedTemplates, profile.Namespace)
178         if err != nil {
179                 return InstanceResponse{}, pkgerrors.Wrap(err, "Create Kubernetes Resources")
180         }
181
182         //Compose the return response
183         resp := InstanceResponse{
184                 ID:          id,
185                 Request:     i,
186                 Namespace:   profile.Namespace,
187                 ReleaseName: releaseName,
188                 Resources:   createdResources,
189                 Hooks:       hookList,
190         }
191
192         key := InstanceKey{
193                 ID: id,
194         }
195         err = db.DBconn.Create(v.storeName, key, v.tagInst, resp)
196         if err != nil {
197                 return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry")
198         }
199
200         return resp, nil
201 }
202
203 // Get returns the instance for corresponding ID
204 func (v *InstanceClient) Get(id string) (InstanceResponse, error) {
205         key := InstanceKey{
206                 ID: id,
207         }
208         value, err := db.DBconn.Read(v.storeName, key, v.tagInst)
209         if err != nil {
210                 return InstanceResponse{}, pkgerrors.Wrap(err, "Get Instance")
211         }
212
213         //value is a byte array
214         if value != nil {
215                 resp := InstanceResponse{}
216                 err = db.DBconn.Unmarshal(value, &resp)
217                 if err != nil {
218                         return InstanceResponse{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value")
219                 }
220                 return resp, nil
221         }
222
223         return InstanceResponse{}, pkgerrors.New("Error getting Instance")
224 }
225
226 // Query returns state of instance's filtered resources
227 func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error) {
228
229         queryClient := NewQueryClient()
230         //Read the status from the DB
231         key := InstanceKey{
232                 ID: id,
233         }
234         value, err := db.DBconn.Read(v.storeName, key, v.tagInst)
235         if err != nil {
236                 return InstanceStatus{}, pkgerrors.Wrap(err, "Get Instance")
237         }
238         if value == nil { //value is a byte array
239                 return InstanceStatus{}, pkgerrors.New("Status is not available")
240         }
241         resResp := InstanceResponse{}
242         err = db.DBconn.Unmarshal(value, &resResp)
243         if err != nil {
244                 return InstanceStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value")
245         }
246
247         resources, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, apiVersion, kind, name, labels, id)
248         if err != nil {
249                 return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources")
250         }
251
252         resp := InstanceStatus{
253                 Request:         resResp.Request,
254                 ResourceCount:   resources.ResourceCount,
255                 ResourcesStatus: resources.ResourcesStatus,
256         }
257         return resp, nil
258 }
259
260 // Status returns the status for the instance
261 func (v *InstanceClient) Status(id string) (InstanceStatus, error) {
262
263         //Read the status from the DB
264         key := InstanceKey{
265                 ID: id,
266         }
267
268         value, err := db.DBconn.Read(v.storeName, key, v.tagInst)
269         if err != nil {
270                 return InstanceStatus{}, pkgerrors.Wrap(err, "Get Instance")
271         }
272
273         //value is a byte array
274         if value == nil {
275                 return InstanceStatus{}, pkgerrors.New("Status is not available")
276         }
277
278         resResp := InstanceResponse{}
279         err = db.DBconn.Unmarshal(value, &resResp)
280         if err != nil {
281                 return InstanceStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value")
282         }
283
284         k8sClient := KubernetesClient{}
285         err = k8sClient.Init(resResp.Request.CloudRegion, id)
286         if err != nil {
287                 return InstanceStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
288         }
289
290         cumulatedErrorMsg := make([]string, 0)
291         podsStatus, err := k8sClient.getPodsByLabel(resResp.Namespace)
292         if err != nil {
293                 cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
294         }
295
296         isReady := true
297         generalStatus := make([]ResourceStatus, 0, len(resResp.Resources))
298 Main:
299         for _, oneResource := range resResp.Resources {
300                 for _, pod := range podsStatus {
301                         if oneResource.GVK == pod.GVK && oneResource.Name == pod.Name {
302                                 continue Main //Don't double check pods if someone decided to define pod explicitly in helm chart
303                         }
304                 }
305                 status, err := k8sClient.GetResourceStatus(oneResource, resResp.Namespace)
306                 if err != nil {
307                         cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
308                 } else {
309                         generalStatus = append(generalStatus, status)
310                 }
311
312                 ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status)
313
314                 if !ready || err != nil {
315                         isReady = false
316                         cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
317                         break
318                 }
319         }
320         resp := InstanceStatus{
321                 Request:         resResp.Request,
322                 ResourceCount:   int32(len(generalStatus) + len(podsStatus)),
323                 Ready:           isReady, //FIXME To determine readiness, some parsing of status fields is necessary
324                 ResourcesStatus: append(generalStatus, podsStatus...),
325         }
326
327         if len(cumulatedErrorMsg) != 0 {
328                 err = pkgerrors.New("Getting Resources Status:\n" +
329                         strings.Join(cumulatedErrorMsg, "\n"))
330                 return resp, err
331         }
332         //TODO Filter response content by requested verbosity (brief, ...)?
333
334         return resp, nil
335 }
336
337 func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient KubernetesClient, namespace string, status ResourceStatus) (bool, error){
338         readyChecker := statuscheck.NewReadyChecker(k8sClient.clientSet, statuscheck.PausedAsReady(true), statuscheck.CheckJobs(true))
339         ctx, cancel := context.WithTimeout(context.Background(), time.Duration(60)*time.Second)
340         defer cancel()
341
342         apiVersion, kind := rss.GVK.ToAPIVersionAndKind()
343         log.Printf("apiVersion: %s, Kind: %s", apiVersion, kind)
344         restClient, err := k8sClient.getRestApi(apiVersion)
345         if err != nil {
346                 return false, err
347         }
348         mapper := k8sClient.GetMapper()
349         mapping, err := mapper.RESTMapping(schema.GroupKind{
350                 Group: rss.GVK.Group,
351                 Kind:  rss.GVK.Kind,
352         }, rss.GVK.Version)
353         resourceInfo := resource.Info{
354                 Client:          restClient,
355                 Mapping:         mapping,
356                 Namespace:       namespace,
357                 Name:            rss.Name,
358                 Source:          "",
359                 Object:          nil,
360                 ResourceVersion: "",
361         }
362
363         var parsedRes runtime.Object
364         //TODO: Should we care about different api version for a same kind?
365         switch kind {
366         case "Pod":
367                 parsedRes = new(corev1.Pod)
368         case "Job":
369                 parsedRes = new(batchv1.Job)
370         case "Deployment":
371                 parsedRes = new(appsv1.Deployment)
372         case "PersistentVolumeClaim":
373                 parsedRes = new(corev1.PersistentVolume)
374         case "Service":
375                 parsedRes = new(corev1.Service)
376         case "DaemonSet":
377                 parsedRes = new(appsv1.DaemonSet)
378         case "CustomResourceDefinition":
379                 parsedRes = new(apiextv1.CustomResourceDefinition)
380         case "StatefulSet":
381                 parsedRes = new(appsv1.StatefulSet)
382         case "ReplicationController":
383                 parsedRes = new(corev1.ReplicationController)
384         case "ReplicaSet":
385                 parsedRes = new(appsv1.ReplicaSet)
386         default:
387                 //For not listed resource, consider ready
388                 return true, nil
389         }
390         err = runtime.DefaultUnstructuredConverter.FromUnstructured(status.Status.Object, parsedRes)
391         if err != nil {
392                 return false, err
393         }
394         resourceInfo.Object = parsedRes
395         ready, err := readyChecker.IsReady(ctx, &resourceInfo)
396         return ready, err
397 }
398
399 // List returns the instance for corresponding ID
400 // Empty string returns all
401 func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) {
402
403         dbres, err := db.DBconn.ReadAll(v.storeName, v.tagInst)
404         if err != nil || len(dbres) == 0 {
405                 return []InstanceMiniResponse{}, pkgerrors.Wrap(err, "Listing Instances")
406         }
407
408         var results []InstanceMiniResponse
409
410         for key, value := range dbres {
411                 //value is a byte array
412                 if value != nil {
413                         resp := InstanceResponse{}
414                         err = db.DBconn.Unmarshal(value, &resp)
415                         if err != nil {
416                                 log.Printf("[Instance] Error: %s Unmarshaling Instance: %s", err.Error(), key)
417                         }
418
419                         miniresp := InstanceMiniResponse{
420                                 ID:          resp.ID,
421                                 Request:     resp.Request,
422                                 Namespace:   resp.Namespace,
423                                 ReleaseName: resp.ReleaseName,
424                         }
425
426                         //Filter based on the accepted keys
427                         if len(rbname) != 0 &&
428                                 miniresp.Request.RBName != rbname {
429                                 continue
430                         }
431                         if len(rbversion) != 0 &&
432                                 miniresp.Request.RBVersion != rbversion {
433                                 continue
434                         }
435                         if len(profilename) != 0 &&
436                                 miniresp.Request.ProfileName != profilename {
437                                 continue
438                         }
439
440                         results = append(results, miniresp)
441                 }
442         }
443
444         return results, nil
445 }
446
447 // Find returns the instances that match the given criteria
448 // If version is empty, it will return all instances for a given rbName
449 // If profile is empty, it will return all instances for a given rbName+version
450 // If labelKeys are provided, the results are filtered based on that.
451 // It is an AND operation for labelkeys.
452 func (v *InstanceClient) Find(rbName string, version string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error) {
453         if rbName == "" && len(labelKeys) == 0 {
454                 return []InstanceMiniResponse{}, pkgerrors.New("rbName or labelkeys is required and cannot be empty")
455         }
456
457         responses, err := v.List(rbName, version, profile)
458         if err != nil {
459                 return []InstanceMiniResponse{}, pkgerrors.Wrap(err, "Listing Instances")
460         }
461
462         ret := []InstanceMiniResponse{}
463
464         //filter the list by labelKeys now
465         for _, resp := range responses {
466
467                 add := true
468                 for k, v := range labelKeys {
469                         if resp.Request.Labels[k] != v {
470                                 add = false
471                                 break
472                         }
473                 }
474                 // If label was not found in the response, don't add it
475                 if add {
476                         ret = append(ret, resp)
477                 }
478
479         }
480
481         return ret, nil
482 }
483
484 // Delete the Instance from database
485 func (v *InstanceClient) Delete(id string) error {
486         inst, err := v.Get(id)
487         if err != nil {
488                 return pkgerrors.Wrap(err, "Error getting Instance")
489         }
490
491         k8sClient := KubernetesClient{}
492         err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
493         if err != nil {
494                 return pkgerrors.Wrap(err, "Getting CloudRegion Information")
495         }
496
497         err = k8sClient.deleteResources(inst.Resources, inst.Namespace)
498         if err != nil {
499                 return pkgerrors.Wrap(err, "Deleting Instance Resources")
500         }
501
502         key := InstanceKey{
503                 ID: id,
504         }
505         err = db.DBconn.Delete(v.storeName, key, v.tagInst)
506         if err != nil {
507                 return pkgerrors.Wrap(err, "Delete Instance")
508         }
509
510         return nil
511 }