2 * Copyright 2018 Intel Corporation, Inc
3 * Copyright © 2021 Samsung Electronics
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
31 "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
32 "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
33 "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb"
35 "github.com/ghodss/yaml"
36 pkgerrors "github.com/pkg/errors"
39 //ConfigStore contains the values that will be stored in the database
40 type configVersionDBContent struct {
41 ConfigNew Config `json:"config-new"`
42 ConfigPrev Config `json:"config-prev"`
43 Action string `json:"action"` // CRUD opration for this config
46 //ConfigStore to Store the Config
47 type ConfigStore struct {
52 //ConfigVersionStore to Store the Versions of the Config
53 type ConfigVersionStore struct {
57 type configResourceList struct {
58 resourceTemplates []helm.KubernetesResourceTemplate
59 createdResources []helm.KubernetesResource
64 type profileDataManager struct {
65 profileLockMap map[string]*sync.Mutex
66 resourceChannel map[string](chan configResourceList)
72 tagCounter = "counter"
73 tagVersion = "configversion"
74 tagConfig = "configdata"
77 var profileData = profileDataManager{
78 profileLockMap: map[string]*sync.Mutex{},
79 resourceChannel: map[string]chan configResourceList{},
82 // Construct key for storing data
83 func constructKey(strs ...string) string {
85 var sb strings.Builder
86 sb.WriteString("onapk8s")
88 sb.WriteString(storeName)
90 for _, str := range strs {
98 // Create an entry for the config in the database
99 func (c ConfigStore) createConfig(p Config) error {
101 rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
103 return pkgerrors.Wrap(err, "Retrieving model info")
105 cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagConfig, p.ConfigName)
106 _, err = db.Etcd.Get(cfgKey)
108 return pkgerrors.Wrap(err, "Config DB Entry Already exists")
110 configValue, err := db.Serialize(p)
112 return pkgerrors.Wrap(err, "Serialize Config Value")
114 err = db.Etcd.Put(cfgKey, configValue)
116 return pkgerrors.Wrap(err, "Config DB Entry")
121 // Update the config entry in the database. Updates with the new value
122 // Returns the previous value of the Config
123 func (c ConfigStore) updateConfig(p Config) (Config, error) {
125 rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
127 return Config{}, pkgerrors.Wrap(err, "Retrieving model info")
129 cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagConfig, p.ConfigName)
130 value, err := db.Etcd.Get(cfgKey)
131 configPrev := Config{}
133 // If updating Config after rollback then previous config may not exist
134 err = db.DeSerialize(string(value), &configPrev)
136 return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value")
139 configValue, err := db.Serialize(p)
141 return Config{}, pkgerrors.Wrap(err, "Serialize Config Value")
143 err = db.Etcd.Put(cfgKey, configValue)
145 return Config{}, pkgerrors.Wrap(err, "Config DB Entry")
147 return configPrev, nil
150 // Read the config entry in the database
151 func (c ConfigStore) getConfig() (Config, error) {
152 rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
154 return Config{}, pkgerrors.Wrap(err, "Retrieving model info")
156 cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagConfig, c.configName)
157 value, err := db.Etcd.Get(cfgKey)
159 return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry")
161 //value is a byte array
164 err = db.DeSerialize(string(value), &cfg)
166 return Config{}, pkgerrors.Wrap(err, "Unmarshaling Config Value")
170 return Config{}, pkgerrors.Wrap(err, "Get Config DB Entry")
173 // Delete the config entry in the database
174 func (c ConfigStore) deleteConfig() (Config, error) {
176 rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
178 return Config{}, pkgerrors.Wrap(err, "Retrieving model info")
180 cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagConfig, c.configName)
181 value, err := db.Etcd.Get(cfgKey)
183 return Config{}, pkgerrors.Wrap(err, "Config DB Entry Not found")
185 configPrev := Config{}
186 err = db.DeSerialize(string(value), &configPrev)
188 return Config{}, pkgerrors.Wrap(err, "DeSerialize Config Value")
191 err = db.Etcd.Delete(cfgKey)
193 return Config{}, pkgerrors.Wrap(err, "Config DB Entry")
195 return configPrev, nil
198 // Create a version for the configuration. If previous config provided that is also stored
199 func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) {
201 version, err := c.incrementVersion()
204 return 0, pkgerrors.Wrap(err, "Get Next Version")
206 rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
208 return 0, pkgerrors.Wrap(err, "Retrieving model info")
210 versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(version)))
212 var cs configVersionDBContent
214 cs.ConfigNew = configNew
215 cs.ConfigPrev = configPrev
217 configValue, err := db.Serialize(cs)
219 return 0, pkgerrors.Wrap(err, "Serialize Config Value")
221 err = db.Etcd.Put(versionKey, configValue)
223 return 0, pkgerrors.Wrap(err, "Create Config DB Entry")
228 // Delete current version of the configuration. Configuration always deleted from top
229 func (c ConfigVersionStore) deleteConfigVersion() error {
231 counter, err := c.getCurrentVersion()
234 return pkgerrors.Wrap(err, "Get Next Version")
236 rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
238 return pkgerrors.Wrap(err, "Retrieving model info")
240 versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(counter)))
242 err = db.Etcd.Delete(versionKey)
244 return pkgerrors.Wrap(err, "Delete Config DB Entry")
246 err = c.decrementVersion()
248 return pkgerrors.Wrap(err, "Decrement Version")
253 // Read the specified version of the configuration and return its prev and current value.
254 // Also returns the action for the config version
255 func (c ConfigVersionStore) getConfigVersion(version uint) (Config, Config, string, error) {
257 rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
259 return Config{}, Config{}, "", pkgerrors.Wrap(err, "Retrieving model info")
261 versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, strconv.Itoa(int(version)))
262 configBytes, err := db.Etcd.Get(versionKey)
264 return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ")
267 if configBytes != nil {
268 pr := configVersionDBContent{}
269 err = db.DeSerialize(string(configBytes), &pr)
271 return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version")
273 return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil
275 return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ")
278 // Get the counter for the version
279 func (c ConfigVersionStore) getCurrentVersion() (uint, error) {
281 rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
283 return 0, pkgerrors.Wrap(err, "Retrieving model info")
285 cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter)
287 value, err := db.Etcd.Get(cfgKey)
289 if strings.Contains(err.Error(), "Key doesn't exist") == true {
290 // Counter not started yet, 0 is invalid value
293 return 0, pkgerrors.Wrap(err, "Get Current Version")
297 index, err := strconv.Atoi(string(value))
299 return 0, pkgerrors.Wrap(err, "Invalid counter")
301 return uint(index), nil
304 // Update the counter for the version
305 func (c ConfigVersionStore) updateVersion(counter uint) error {
307 rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
309 return pkgerrors.Wrap(err, "Retrieving model info")
311 cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter)
312 err = db.Etcd.Put(cfgKey, strconv.Itoa(int(counter)))
314 return pkgerrors.Wrap(err, "Counter DB Entry")
319 // Increment the version counter
320 func (c ConfigVersionStore) incrementVersion() (uint, error) {
322 counter, err := c.getCurrentVersion()
324 return 0, pkgerrors.Wrap(err, "Get Next Counter Value")
326 //This is done while Profile lock is taken
328 err = c.updateVersion(counter)
330 return 0, pkgerrors.Wrap(err, "Store Next Counter Value")
336 // Decrement the version counter
337 func (c ConfigVersionStore) decrementVersion() error {
339 counter, err := c.getCurrentVersion()
341 return pkgerrors.Wrap(err, "Get Next Counter Value")
343 //This is done while Profile lock is taken
345 err = c.updateVersion(counter)
347 return pkgerrors.Wrap(err, "Store Next Counter Value")
354 func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string) error {
356 rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
358 return pkgerrors.Wrap(err, "Retrieving model info")
360 // Get Template and Resolve the template with values
361 crl, err := resolve(rbName, rbVersion, profileName, p)
363 return pkgerrors.Wrap(err, "Resolve Config")
366 // Send the configResourceList to the channel. Using select for non-blocking channel
368 case pChannel <- crl:
369 log.Printf("Message Sent to goroutine %v", crl.profile)
376 // Per Profile Go routine to apply the configuration to Cloud Region
377 func scheduleResources(c chan configResourceList) {
378 // Keep thread running
381 //TODO: ADD Check to see if Application running
382 ic := NewInstanceClient()
383 resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil)
384 if err != nil || len(resp) == 0 {
385 log.Println("Error finding a running instance. Retrying later...")
386 time.Sleep(time.Second * 10)
390 case data.action == "POST":
391 log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates)
392 for _, inst := range resp {
393 k8sClient := KubernetesClient{}
394 err = k8sClient.init(inst.Request.CloudRegion, inst.ID)
396 log.Printf("Getting CloudRegion Information: %s", err.Error())
397 //Move onto the next cloud region
400 //assuming - the resource is not exist already
401 data.createdResources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace)
404 // assuming - the err represent the resource is already exist, so going for update
405 data.createdResources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
407 log.Printf("Error Creating resources: %s", errCreate.Error())
408 log.Printf("Error Updating resources: %s", err.Error())
413 //TODO: Needs to add code to call Kubectl create
414 case data.action == "PUT":
415 log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates)
416 //TODO: Needs to add code to call Kubectl apply
417 case data.action == "DELETE":
418 log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates)
419 for _, inst := range resp {
420 k8sClient := KubernetesClient{}
421 err = k8sClient.init(inst.Request.CloudRegion, inst.ID)
423 log.Printf("Getting CloudRegion Information: %s", err.Error())
424 //Move onto the next cloud region
427 err = k8sClient.deleteResources(data.createdResources, inst.Namespace)
429 log.Printf("Error Deleting resources: %s", err.Error())
437 //Resolve returns the path where the helm chart merged with
438 //configuration overrides resides.
439 var resolve = func(rbName, rbVersion, profileName string, p Config) (configResourceList, error) {
441 var resTemplates []helm.KubernetesResourceTemplate
443 profile, err := rb.NewProfileClient().Get(rbName, rbVersion, profileName)
445 return configResourceList{}, pkgerrors.Wrap(err, "Reading Profile Data")
448 t, err := rb.NewConfigTemplateClient().Get(rbName, rbVersion, p.TemplateName)
450 return configResourceList{}, pkgerrors.Wrap(err, "Getting Template")
452 if t.ChartName == "" {
453 return configResourceList{}, pkgerrors.New("Invalid template no Chart.yaml file found")
456 def, err := rb.NewConfigTemplateClient().Download(rbName, rbVersion, p.TemplateName)
458 return configResourceList{}, pkgerrors.Wrap(err, "Downloading Template")
461 //Create a temp file in the system temp folder for values input
462 b, err := json.Marshal(p.Values)
464 return configResourceList{}, pkgerrors.Wrap(err, "Error Marshalling config data")
466 data, err := yaml.JSONToYAML(b)
468 return configResourceList{}, pkgerrors.Wrap(err, "JSON to YAML")
471 outputfile, err := ioutil.TempFile("", "helm-config-values-")
473 return configResourceList{}, pkgerrors.Wrap(err, "Got error creating temp file")
475 _, err = outputfile.Write([]byte(data))
477 return configResourceList{}, pkgerrors.Wrap(err, "Got error writting temp file")
479 defer outputfile.Close()
481 chartBasePath, err := rb.ExtractTarBall(bytes.NewBuffer(def))
483 return configResourceList{}, pkgerrors.Wrap(err, "Extracting Template")
486 helmClient := helm.NewTemplateClient(profile.KubernetesVersion,
490 chartPath := filepath.Join(chartBasePath, t.ChartName)
491 resTemplates, err = helmClient.GenerateKubernetesArtifacts(chartPath,
492 []string{outputfile.Name()},
495 return configResourceList{}, pkgerrors.Wrap(err, "Generate final k8s yaml")
497 crl := configResourceList{
498 resourceTemplates: resTemplates,
505 // Get the Mutex for the Profile
506 func getProfileData(key string) (*sync.Mutex, chan configResourceList) {
508 defer profileData.Unlock()
509 _, ok := profileData.profileLockMap[key]
511 profileData.profileLockMap[key] = &sync.Mutex{}
513 _, ok = profileData.resourceChannel[key]
515 profileData.resourceChannel[key] = make(chan configResourceList)
516 go scheduleResources(profileData.resourceChannel[key])
518 return profileData.profileLockMap[key], profileData.resourceChannel[key]