2 * Copyright 2018 Intel Corporation, Inc
3 * Copyright © 2021 Samsung Electronics
4 * Copyright © 2021 Orange
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
25 appsv1 "k8s.io/api/apps/v1"
26 batchv1 "k8s.io/api/batch/v1"
27 corev1 "k8s.io/api/core/v1"
28 apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/apimachinery/pkg/runtime/schema"
31 "k8s.io/cli-runtime/pkg/resource"
36 "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
37 "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
38 "github.com/onap/multicloud-k8s/src/k8splugin/internal/namegenerator"
39 "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb"
40 "github.com/onap/multicloud-k8s/src/k8splugin/internal/statuscheck"
42 pkgerrors "github.com/pkg/errors"
45 // InstanceRequest contains the parameters needed for instantiation
47 type InstanceRequest struct {
48 RBName string `json:"rb-name"`
49 RBVersion string `json:"rb-version"`
50 ProfileName string `json:"profile-name"`
51 ReleaseName string `json:"release-name"`
52 CloudRegion string `json:"cloud-region"`
53 Labels map[string]string `json:"labels"`
54 OverrideValues map[string]string `json:"override-values"`
57 // InstanceResponse contains the response from instantiation
58 type InstanceResponse struct {
60 Request InstanceRequest `json:"request"`
61 Namespace string `json:"namespace"`
62 ReleaseName string `json:"release-name"`
63 Resources []helm.KubernetesResource `json:"resources"`
64 Hooks []*helm.Hook `json:"-"`
67 // InstanceMiniResponse contains the response from instantiation
68 // It does NOT include the created resources.
69 // Use the regular GET to get the created resources for a particular instance
70 type InstanceMiniResponse struct {
72 Request InstanceRequest `json:"request"`
73 ReleaseName string `json:"release-name"`
74 Namespace string `json:"namespace"`
77 // InstanceStatus is what is returned when status is queried for an instance
78 type InstanceStatus struct {
79 Request InstanceRequest `json:"request"`
80 Ready bool `json:"ready"`
81 ResourceCount int32 `json:"resourceCount"`
82 ResourcesStatus []ResourceStatus `json:"resourcesStatus"`
85 // InstanceManager is an interface exposes the instantiation functionality
86 type InstanceManager interface {
87 Create(i InstanceRequest) (InstanceResponse, error)
88 Get(id string) (InstanceResponse, error)
89 Status(id string) (InstanceStatus, error)
90 Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error)
91 List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error)
92 Find(rbName string, ver string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error)
93 Delete(id string) error
96 // InstanceKey is used as the primary key in the db
97 type InstanceKey struct {
101 // We will use json marshalling to convert to string to
102 // preserve the underlying structure.
103 func (dk InstanceKey) String() string {
104 out, err := json.Marshal(dk)
112 // InstanceClient implements the InstanceManager interface
113 // It will also be used to maintain some localized state
114 type InstanceClient struct {
119 // NewInstanceClient returns an instance of the InstanceClient
120 // which implements the InstanceManager
121 func NewInstanceClient() *InstanceClient {
122 return &InstanceClient{
128 // Simplified function to retrieve model data from instance ID
129 func resolveModelFromInstance(instanceID string) (rbName, rbVersion, profileName, releaseName string, err error) {
130 v := NewInstanceClient()
131 resp, err := v.Get(instanceID)
133 return "", "", "", "", pkgerrors.Wrap(err, "Getting instance")
135 return resp.Request.RBName, resp.Request.RBVersion, resp.Request.ProfileName, resp.ReleaseName, nil
138 // Create an instance of rb on the cluster in the database
139 func (v *InstanceClient) Create(i InstanceRequest) (InstanceResponse, error) {
142 if i.RBName == "" || i.RBVersion == "" || i.ProfileName == "" || i.CloudRegion == "" {
143 return InstanceResponse{},
144 pkgerrors.New("RBName, RBversion, ProfileName, CloudRegion are required to create a new instance")
147 //Check if profile exists
148 profile, err := rb.NewProfileClient().Get(i.RBName, i.RBVersion, i.ProfileName)
150 return InstanceResponse{}, pkgerrors.New("Unable to find Profile to create instance")
153 //Convert override values from map to array of strings of the following format
155 overrideValues := []string{}
156 if i.OverrideValues != nil {
157 for k, v := range i.OverrideValues {
158 overrideValues = append(overrideValues, k+"="+v)
162 //Execute the kubernetes create command
163 sortedTemplates, hookList, releaseName, err := rb.NewProfileClient().Resolve(i.RBName, i.RBVersion, i.ProfileName, overrideValues, i.ReleaseName)
165 return InstanceResponse{}, pkgerrors.Wrap(err, "Error resolving helm charts")
168 // TODO: Only generate if id is not provided
169 id := namegenerator.Generate()
171 k8sClient := KubernetesClient{}
172 err = k8sClient.Init(i.CloudRegion, id)
174 return InstanceResponse{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
177 createdResources, err := k8sClient.createResources(sortedTemplates, profile.Namespace)
179 return InstanceResponse{}, pkgerrors.Wrap(err, "Create Kubernetes Resources")
182 //Compose the return response
183 resp := InstanceResponse{
186 Namespace: profile.Namespace,
187 ReleaseName: releaseName,
188 Resources: createdResources,
195 err = db.DBconn.Create(v.storeName, key, v.tagInst, resp)
197 return InstanceResponse{}, pkgerrors.Wrap(err, "Creating Instance DB Entry")
203 // Get returns the instance for corresponding ID
204 func (v *InstanceClient) Get(id string) (InstanceResponse, error) {
208 value, err := db.DBconn.Read(v.storeName, key, v.tagInst)
210 return InstanceResponse{}, pkgerrors.Wrap(err, "Get Instance")
213 //value is a byte array
215 resp := InstanceResponse{}
216 err = db.DBconn.Unmarshal(value, &resp)
218 return InstanceResponse{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value")
223 return InstanceResponse{}, pkgerrors.New("Error getting Instance")
226 // Query returns state of instance's filtered resources
227 func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error) {
229 queryClient := NewQueryClient()
230 //Read the status from the DB
234 value, err := db.DBconn.Read(v.storeName, key, v.tagInst)
236 return InstanceStatus{}, pkgerrors.Wrap(err, "Get Instance")
238 if value == nil { //value is a byte array
239 return InstanceStatus{}, pkgerrors.New("Status is not available")
241 resResp := InstanceResponse{}
242 err = db.DBconn.Unmarshal(value, &resResp)
244 return InstanceStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value")
247 resources, err := queryClient.Query(resResp.Namespace, resResp.Request.CloudRegion, apiVersion, kind, name, labels, id)
249 return InstanceStatus{}, pkgerrors.Wrap(err, "Querying Resources")
252 resp := InstanceStatus{
253 Request: resResp.Request,
254 ResourceCount: resources.ResourceCount,
255 ResourcesStatus: resources.ResourcesStatus,
260 // Status returns the status for the instance
261 func (v *InstanceClient) Status(id string) (InstanceStatus, error) {
263 //Read the status from the DB
268 value, err := db.DBconn.Read(v.storeName, key, v.tagInst)
270 return InstanceStatus{}, pkgerrors.Wrap(err, "Get Instance")
273 //value is a byte array
275 return InstanceStatus{}, pkgerrors.New("Status is not available")
278 resResp := InstanceResponse{}
279 err = db.DBconn.Unmarshal(value, &resResp)
281 return InstanceStatus{}, pkgerrors.Wrap(err, "Unmarshaling Instance Value")
284 k8sClient := KubernetesClient{}
285 err = k8sClient.Init(resResp.Request.CloudRegion, id)
287 return InstanceStatus{}, pkgerrors.Wrap(err, "Getting CloudRegion Information")
290 cumulatedErrorMsg := make([]string, 0)
291 podsStatus, err := k8sClient.getPodsByLabel(resResp.Namespace)
293 cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
297 generalStatus := make([]ResourceStatus, 0, len(resResp.Resources))
299 for _, oneResource := range resResp.Resources {
300 for _, pod := range podsStatus {
301 if oneResource.GVK == pod.GVK && oneResource.Name == pod.Name {
302 continue Main //Don't double check pods if someone decided to define pod explicitly in helm chart
305 status, err := k8sClient.GetResourceStatus(oneResource, resResp.Namespace)
307 cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
309 generalStatus = append(generalStatus, status)
312 ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status)
314 if !ready || err != nil {
316 cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error())
320 resp := InstanceStatus{
321 Request: resResp.Request,
322 ResourceCount: int32(len(generalStatus) + len(podsStatus)),
323 Ready: isReady, //FIXME To determine readiness, some parsing of status fields is necessary
324 ResourcesStatus: append(generalStatus, podsStatus...),
327 if len(cumulatedErrorMsg) != 0 {
328 err = pkgerrors.New("Getting Resources Status:\n" +
329 strings.Join(cumulatedErrorMsg, "\n"))
332 //TODO Filter response content by requested verbosity (brief, ...)?
337 func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient KubernetesClient, namespace string, status ResourceStatus) (bool, error){
338 readyChecker := statuscheck.NewReadyChecker(k8sClient.clientSet, statuscheck.PausedAsReady(true), statuscheck.CheckJobs(true))
339 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(60)*time.Second)
342 apiVersion, kind := rss.GVK.ToAPIVersionAndKind()
343 log.Printf("apiVersion: %s, Kind: %s", apiVersion, kind)
344 restClient, err := k8sClient.getRestApi(apiVersion)
348 mapper := k8sClient.GetMapper()
349 mapping, err := mapper.RESTMapping(schema.GroupKind{
350 Group: rss.GVK.Group,
353 resourceInfo := resource.Info{
356 Namespace: namespace,
363 var parsedRes runtime.Object
364 //TODO: Should we care about different api version for a same kind?
367 parsedRes = new(corev1.Pod)
369 parsedRes = new(batchv1.Job)
371 parsedRes = new(appsv1.Deployment)
372 case "PersistentVolumeClaim":
373 parsedRes = new(corev1.PersistentVolume)
375 parsedRes = new(corev1.Service)
377 parsedRes = new(appsv1.DaemonSet)
378 case "CustomResourceDefinition":
379 parsedRes = new(apiextv1.CustomResourceDefinition)
381 parsedRes = new(appsv1.StatefulSet)
382 case "ReplicationController":
383 parsedRes = new(corev1.ReplicationController)
385 parsedRes = new(appsv1.ReplicaSet)
387 //For not listed resource, consider ready
390 err = runtime.DefaultUnstructuredConverter.FromUnstructured(status.Status.Object, parsedRes)
394 resourceInfo.Object = parsedRes
395 ready, err := readyChecker.IsReady(ctx, &resourceInfo)
399 // List returns the instance for corresponding ID
400 // Empty string returns all
401 func (v *InstanceClient) List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) {
403 dbres, err := db.DBconn.ReadAll(v.storeName, v.tagInst)
404 if err != nil || len(dbres) == 0 {
405 return []InstanceMiniResponse{}, pkgerrors.Wrap(err, "Listing Instances")
408 var results []InstanceMiniResponse
410 for key, value := range dbres {
411 //value is a byte array
413 resp := InstanceResponse{}
414 err = db.DBconn.Unmarshal(value, &resp)
416 log.Printf("[Instance] Error: %s Unmarshaling Instance: %s", err.Error(), key)
419 miniresp := InstanceMiniResponse{
421 Request: resp.Request,
422 Namespace: resp.Namespace,
423 ReleaseName: resp.ReleaseName,
426 //Filter based on the accepted keys
427 if len(rbname) != 0 &&
428 miniresp.Request.RBName != rbname {
431 if len(rbversion) != 0 &&
432 miniresp.Request.RBVersion != rbversion {
435 if len(profilename) != 0 &&
436 miniresp.Request.ProfileName != profilename {
440 results = append(results, miniresp)
447 // Find returns the instances that match the given criteria
448 // If version is empty, it will return all instances for a given rbName
449 // If profile is empty, it will return all instances for a given rbName+version
450 // If labelKeys are provided, the results are filtered based on that.
451 // It is an AND operation for labelkeys.
452 func (v *InstanceClient) Find(rbName string, version string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error) {
453 if rbName == "" && len(labelKeys) == 0 {
454 return []InstanceMiniResponse{}, pkgerrors.New("rbName or labelkeys is required and cannot be empty")
457 responses, err := v.List(rbName, version, profile)
459 return []InstanceMiniResponse{}, pkgerrors.Wrap(err, "Listing Instances")
462 ret := []InstanceMiniResponse{}
464 //filter the list by labelKeys now
465 for _, resp := range responses {
468 for k, v := range labelKeys {
469 if resp.Request.Labels[k] != v {
474 // If label was not found in the response, don't add it
476 ret = append(ret, resp)
484 // Delete the Instance from database
485 func (v *InstanceClient) Delete(id string) error {
486 inst, err := v.Get(id)
488 return pkgerrors.Wrap(err, "Error getting Instance")
491 k8sClient := KubernetesClient{}
492 err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
494 return pkgerrors.Wrap(err, "Getting CloudRegion Information")
497 err = k8sClient.deleteResources(inst.Resources, inst.Namespace)
499 return pkgerrors.Wrap(err, "Deleting Instance Resources")
505 err = db.DBconn.Delete(v.storeName, key, v.tagInst)
507 return pkgerrors.Wrap(err, "Delete Instance")