Move Day2 Config Values API to new endpoint
[multicloud/k8s.git] / src / k8splugin / internal / app / config_backend.go
1 /*
2  * Copyright 2018 Intel Corporation, Inc
3  * Copyright © 2021 Samsung Electronics
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 package app
19
20 import (
21         "bytes"
22         "encoding/json"
23         "io/ioutil"
24         "log"
25         "path/filepath"
26         "strconv"
27         "strings"
28         "sync"
29         "time"
30
31         "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
32         "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
33         "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb"
34
35         "github.com/ghodss/yaml"
36         pkgerrors "github.com/pkg/errors"
37 )
38
39 //ConfigStore contains the values that will be stored in the database
40 type configVersionDBContent struct {
41         ConfigNew  Config `json:"config-new"`
42         ConfigPrev Config `json:"config-prev"`
43         Action     string `json:"action"` // CRUD opration for this config
44 }
45
46 //ConfigStore to Store the Config
47 type ConfigStore struct {
48         instanceID string
49         configName string
50 }
51
52 //ConfigVersionStore to Store the Versions of the Config
53 type ConfigVersionStore struct {
54         instanceID string
55 }
56
57 type configResourceList struct {
58         resourceTemplates []helm.KubernetesResourceTemplate
59         createdResources  []helm.KubernetesResource
60         profile           rb.Profile
61         action            string
62 }
63
64 type profileDataManager struct {
65         profileLockMap  map[string]*sync.Mutex
66         resourceChannel map[string](chan configResourceList)
67         sync.Mutex
68 }
69
70 const (
71         storeName  = "config"
72         tagCounter = "counter"
73         tagVersion = "configversion"
74         tagConfig  = "configdata"
75 )
76
77 var profileData = profileDataManager{
78         profileLockMap:  map[string]*sync.Mutex{},
79         resourceChannel: map[string]chan configResourceList{},
80 }
81
82 // Construct key for storing data
83 func constructKey(strs ...string) string {
84
85         var sb strings.Builder
86         sb.WriteString("onapk8s")
87         sb.WriteString("/")
88         sb.WriteString(storeName)
89         sb.WriteString("/")
90         for _, str := range strs {
91                 sb.WriteString(str)
92                 sb.WriteString("/")
93         }
94         return sb.String()
95
96 }
97
98 // Create an entry for the config in the database
99 func (c ConfigStore) createConfig(p Config) error {
100
101         rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
102         if err != nil {
103                 return pkgerrors.Wrap(err, "Retrieving model info")
104         }
105         cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagConfig, p.ConfigName)
106         _, err = db.Etcd.Get(cfgKey)
107         if err == nil {
108                 return pkgerrors.Wrap(err, "Config DB Entry Already exists")
109         }
110         configValue, err := db.Serialize(p)
111         if err != nil {
112                 return pkgerrors.Wrap(err, "Serialize Config Value")
113         }
114         err = db.Etcd.Put(cfgKey, configValue)
115         if err != nil {
116                 return pkgerrors.Wrap(err, "Config DB Entry")
117         }
118         return nil
119 }
120
121 // Update the config entry in the database. Updates with the new value
122 // Returns the previous value of the Config
123 func (c ConfigStore) updateConfig(p Config) (Config, error) {
124
125         rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
126         if err != nil {
127                 return Config{}, pkgerrors.Wrap(err, "Retrieving model info")
128         }
129         cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagConfig, p.ConfigName)
130         value, err := db.Etcd.Get(cfgKey)
131         configPrev := Config{}
132         if err == nil {
133                 // If updating Config after rollback then previous config may not exist
134                 err = db.DeSerialize(string(value), &configPrev)
135                 if err != nil {
136                         return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value")
137                 }
138         }
139         configValue, err := db.Serialize(p)
140         if err != nil {
141                 return Config{}, pkgerrors.Wrap(err, "Serialize Config Value")
142         }
143         err = db.Etcd.Put(cfgKey, configValue)
144         if err != nil {
145                 return Config{}, pkgerrors.Wrap(err, "Config DB Entry")
146         }
147         return configPrev, nil
148 }
149
150 // Read the config entry in the database
151 func (c ConfigStore) getConfig() (Config, error) {
152         rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
153         if err != nil {
154                 return Config{}, pkgerrors.Wrap(err, "Retrieving model info")
155         }
156         cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagConfig, c.configName)
157         value, err := db.Etcd.Get(cfgKey)
158         if err != nil {
159                 return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry")
160         }
161         //value is a byte array
162         if value != nil {
163                 cfg := Config{}
164                 err = db.DeSerialize(string(value), &cfg)
165                 if err != nil {
166                         return Config{}, pkgerrors.Wrap(err, "Unmarshaling Config Value")
167                 }
168                 return cfg, nil
169         }
170         return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry")
171 }
172
173 // Delete the config entry in the database
174 func (c ConfigStore) deleteConfig() (Config, error) {
175
176         rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
177         if err != nil {
178                 return Config{}, pkgerrors.Wrap(err, "Retrieving model info")
179         }
180         cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagConfig, c.configName)
181         value, err := db.Etcd.Get(cfgKey)
182         if err != nil {
183                 return Config{}, pkgerrors.Wrap(err, "Config DB Entry Not found")
184         }
185         configPrev := Config{}
186         err = db.DeSerialize(string(value), &configPrev)
187         if err != nil {
188                 return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value")
189         }
190
191         err = db.Etcd.Delete(cfgKey)
192         if err != nil {
193                 return Config{}, pkgerrors.Wrap(err, "Config DB Entry")
194         }
195         return configPrev, nil
196 }
197
198 // Create a version for the configuration. If previous config provided that is also stored
199 func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) {
200
201         version, err := c.incrementVersion()
202
203         if err != nil {
204                 return 0, pkgerrors.Wrap(err, "Get Next Version")
205         }
206         rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
207         if err != nil {
208                 return 0, pkgerrors.Wrap(err, "Retrieving model info")
209         }
210         versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(version)))
211
212         var cs configVersionDBContent
213         cs.Action = action
214         cs.ConfigNew = configNew
215         cs.ConfigPrev = configPrev
216
217         configValue, err := db.Serialize(cs)
218         if err != nil {
219                 return 0, pkgerrors.Wrap(err, "Serialize Config Value")
220         }
221         err = db.Etcd.Put(versionKey, configValue)
222         if err != nil {
223                 return 0, pkgerrors.Wrap(err, "Create Config DB Entry")
224         }
225         return version, nil
226 }
227
228 // Delete current version of the configuration. Configuration always deleted from top
229 func (c ConfigVersionStore) deleteConfigVersion() error {
230
231         counter, err := c.getCurrentVersion()
232
233         if err != nil {
234                 return pkgerrors.Wrap(err, "Get Next Version")
235         }
236         rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
237         if err != nil {
238                 return pkgerrors.Wrap(err, "Retrieving model info")
239         }
240         versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(counter)))
241
242         err = db.Etcd.Delete(versionKey)
243         if err != nil {
244                 return pkgerrors.Wrap(err, "Delete Config DB Entry")
245         }
246         err = c.decrementVersion()
247         if err != nil {
248                 return pkgerrors.Wrap(err, "Decrement Version")
249         }
250         return nil
251 }
252
253 // Read the specified version of the configuration and return its prev and current value.
254 // Also returns the action for the config version
255 func (c ConfigVersionStore) getConfigVersion(version uint) (Config, Config, string, error) {
256
257         rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
258         if err != nil {
259                 return Config{}, Config{}, "", pkgerrors.Wrap(err, "Retrieving model info")
260         }
261         versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(version)))
262         configBytes, err := db.Etcd.Get(versionKey)
263         if err != nil {
264                 return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ")
265         }
266
267         if configBytes != nil {
268                 pr := configVersionDBContent{}
269                 err = db.DeSerialize(string(configBytes), &pr)
270                 if err != nil {
271                         return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version")
272                 }
273                 return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil
274         }
275         return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ")
276 }
277
278 // Get the counter for the version
279 func (c ConfigVersionStore) getCurrentVersion() (uint, error) {
280
281         rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
282         if err != nil {
283                 return 0, pkgerrors.Wrap(err, "Retrieving model info")
284         }
285         cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter)
286
287         value, err := db.Etcd.Get(cfgKey)
288         if err != nil {
289                 if strings.Contains(err.Error(), "Key doesn't exist") == true {
290                         // Counter not started yet, 0 is invalid value
291                         return 0, nil
292                 } else {
293                         return 0, pkgerrors.Wrap(err, "Get Current Version")
294                 }
295         }
296
297         index, err := strconv.Atoi(string(value))
298         if err != nil {
299                 return 0, pkgerrors.Wrap(err, "Invalid counter")
300         }
301         return uint(index), nil
302 }
303
304 // Update the counter for the version
305 func (c ConfigVersionStore) updateVersion(counter uint) error {
306
307         rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
308         if err != nil {
309                 return pkgerrors.Wrap(err, "Retrieving model info")
310         }
311         cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter)
312         err = db.Etcd.Put(cfgKey, strconv.Itoa(int(counter)))
313         if err != nil {
314                 return pkgerrors.Wrap(err, "Counter DB Entry")
315         }
316         return nil
317 }
318
319 // Increment the version counter
320 func (c ConfigVersionStore) incrementVersion() (uint, error) {
321
322         counter, err := c.getCurrentVersion()
323         if err != nil {
324                 return 0, pkgerrors.Wrap(err, "Get Next Counter Value")
325         }
326         //This is done while Profile lock is taken
327         counter++
328         err = c.updateVersion(counter)
329         if err != nil {
330                 return 0, pkgerrors.Wrap(err, "Store Next Counter Value")
331         }
332
333         return counter, nil
334 }
335
336 // Decrement the version counter
337 func (c ConfigVersionStore) decrementVersion() error {
338
339         counter, err := c.getCurrentVersion()
340         if err != nil {
341                 return pkgerrors.Wrap(err, "Get Next Counter Value")
342         }
343         //This is done while Profile lock is taken
344         counter--
345         err = c.updateVersion(counter)
346         if err != nil {
347                 return pkgerrors.Wrap(err, "Store Next Counter Value")
348         }
349
350         return nil
351 }
352
353 // Apply Config
354 func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string) error {
355
356         rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
357         if err != nil {
358                 return pkgerrors.Wrap(err, "Retrieving model info")
359         }
360         // Get Template and Resolve the template with values
361         crl, err := resolve(rbName, rbVersion, profileName, p)
362         if err != nil {
363                 return pkgerrors.Wrap(err, "Resolve Config")
364         }
365         crl.action = action
366         // Send the configResourceList to the channel. Using select for non-blocking channel
367         select {
368         case pChannel <- crl:
369                 log.Printf("Message Sent to goroutine %v", crl.profile)
370         default:
371         }
372
373         return nil
374 }
375
376 // Per Profile Go routine to apply the configuration to Cloud Region
377 func scheduleResources(c chan configResourceList) {
378         // Keep thread running
379         for {
380                 data := <-c
381                 //TODO: ADD Check to see if Application running
382                 ic := NewInstanceClient()
383                 resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil)
384                 if err != nil || len(resp) == 0 {
385                         log.Println("Error finding a running instance. Retrying later...")
386                         time.Sleep(time.Second * 10)
387                         continue
388                 }
389                 switch {
390                 case data.action == "POST":
391                         log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates)
392                         for _, inst := range resp {
393                                 k8sClient := KubernetesClient{}
394                                 err = k8sClient.init(inst.Request.CloudRegion, inst.ID)
395                                 if err != nil {
396                                         log.Printf("Getting CloudRegion Information: %s", err.Error())
397                                         //Move onto the next cloud region
398                                         continue
399                                 }
400                                 //assuming - the resource is not exist already
401                                 data.createdResources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace)
402                                 errCreate := err
403                                 if err != nil {
404                                         // assuming - the err represent the resource is already exist, so going for update
405                                         data.createdResources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
406                                         if err != nil {
407                                                 log.Printf("Error Creating resources: %s", errCreate.Error())
408                                                 log.Printf("Error Updating resources: %s", err.Error())
409                                                 continue
410                                         }
411                                 }
412                         }
413                         //TODO: Needs to add code to call Kubectl create
414                 case data.action == "PUT":
415                         log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates)
416                         //TODO: Needs to add code to call Kubectl apply
417                 case data.action == "DELETE":
418                         log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates)
419                         for _, inst := range resp {
420                                 k8sClient := KubernetesClient{}
421                                 err = k8sClient.init(inst.Request.CloudRegion, inst.ID)
422                                 if err != nil {
423                                         log.Printf("Getting CloudRegion Information: %s", err.Error())
424                                         //Move onto the next cloud region
425                                         continue
426                                 }
427                                 err = k8sClient.deleteResources(data.createdResources, inst.Namespace)
428                                 if err != nil {
429                                         log.Printf("Error Deleting resources: %s", err.Error())
430                                         continue
431                                 }
432                         }
433                 }
434         }
435 }
436
437 //Resolve returns the path where the helm chart merged with
438 //configuration overrides resides.
439 var resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) {
440
441         var resTemplates []helm.KubernetesResourceTemplate
442
443         profile, err := rb.NewProfileClient().Get(rbName, rbVersion, profileName)
444         if err != nil {
445                 return configResourceList{}, pkgerrors.Wrap(err, "Reading Profile Data")
446         }
447
448         t, err := rb.NewConfigTemplateClient().Get(rbName, rbVersion, p.TemplateName)
449         if err != nil {
450                 return configResourceList{}, pkgerrors.Wrap(err, "Getting Template")
451         }
452         if t.ChartName == "" {
453                 return configResourceList{}, pkgerrors.New("Invalid template no Chart.yaml file found")
454         }
455
456         def, err := rb.NewConfigTemplateClient().Download(rbName, rbVersion, p.TemplateName)
457         if err != nil {
458                 return configResourceList{}, pkgerrors.Wrap(err, "Downloading Template")
459         }
460
461         //Create a temp file in the system temp folder for values input
462         b, err := json.Marshal(p.Values)
463         if err != nil {
464                 return configResourceList{}, pkgerrors.Wrap(err, "Error Marshalling config data")
465         }
466         data, err := yaml.JSONToYAML(b)
467         if err != nil {
468                 return configResourceList{}, pkgerrors.Wrap(err, "JSON to YAML")
469         }
470
471         outputfile, err := ioutil.TempFile("", "helm-config-values-")
472         if err != nil {
473                 return configResourceList{}, pkgerrors.Wrap(err, "Got error creating temp file")
474         }
475         _, err = outputfile.Write([]byte(data))
476         if err != nil {
477                 return configResourceList{}, pkgerrors.Wrap(err, "Got error writting temp file")
478         }
479         defer outputfile.Close()
480
481         chartBasePath, err := rb.ExtractTarBall(bytes.NewBuffer(def))
482         if err != nil {
483                 return configResourceList{}, pkgerrors.Wrap(err, "Extracting Template")
484         }
485
486         helmClient := helm.NewTemplateClient(profile.KubernetesVersion,
487                 profile.Namespace,
488                 profile.ReleaseName)
489
490         chartPath := filepath.Join(chartBasePath, t.ChartName)
491         resTemplates, err = helmClient.GenerateKubernetesArtifacts(chartPath,
492                 []string{outputfile.Name()},
493                 nil)
494         if err != nil {
495                 return configResourceList{}, pkgerrors.Wrap(err, "Generate final k8s yaml")
496         }
497         crl := configResourceList{
498                 resourceTemplates: resTemplates,
499                 profile:           profile,
500         }
501
502         return crl, nil
503 }
504
505 // Get the Mutex for the Profile
506 func getProfileData(key string) (*sync.Mutex, chan configResourceList) {
507         profileData.Lock()
508         defer profileData.Unlock()
509         _, ok := profileData.profileLockMap[key]
510         if !ok {
511                 profileData.profileLockMap[key] = &sync.Mutex{}
512         }
513         _, ok = profileData.resourceChannel[key]
514         if !ok {
515                 profileData.resourceChannel[key] = make(chan configResourceList)
516                 go scheduleResources(profileData.resourceChannel[key])
517         }
518         return profileData.profileLockMap[key], profileData.resourceChannel[key]
519 }