Further fixes for config delete operation 65/124665/1
authorLukasz Rajewski <lukasz.rajewski@orange.com>
Fri, 1 Oct 2021 07:35:35 +0000 (09:35 +0200)
committerLukasz Rajewski <lukasz.rajewski@orange.com>
Mon, 4 Oct 2021 10:06:25 +0000 (12:06 +0200)
The issue was related with insufficient handlijg of
different versions of config vs their delete operation
handled by the plugin.

Issue-ID: MULTICLOUD-1332
Signed-off-by: Lukasz Rajewski <lukasz.rajewski@orange.com>
Change-Id: I90d896720fa89ebd66cb3290cdd9401272f5e3fd

12 files changed:
src/k8splugin/api/api.go
src/k8splugin/api/confighandler.go
src/k8splugin/internal/app/client.go
src/k8splugin/internal/app/config.go
src/k8splugin/internal/app/config_backend.go
src/k8splugin/internal/app/config_test.go
src/k8splugin/internal/app/instance.go
src/k8splugin/internal/db/etcd.go
src/k8splugin/internal/db/etcd_testing.go
src/k8splugin/plugins/generic/plugin.go
src/k8splugin/plugins/namespace/plugin.go
src/k8splugin/plugins/service/plugin.go

index e34b93a..ed23f39 100644 (file)
@@ -137,7 +137,8 @@ func NewRouter(defClient rb.DefinitionManager,
        instRouter.HandleFunc("/instance/{instID}/config", configHandler.listHandler).Methods("GET")
        instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.getHandler).Methods("GET")
        instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.updateHandler).Methods("PUT")
-       instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.deleteHandler).Methods("DELETE")
+       instRouter.HandleFunc("/instance/{instID}/config/{cfgname}", configHandler.deleteAllHandler).Methods("DELETE")
+       instRouter.HandleFunc("/instance/{instID}/config/{cfgname}/delete", configHandler.deleteHandler).Methods("POST")
        instRouter.HandleFunc("/instance/{instID}/config/{cfgname}/rollback", configHandler.rollbackHandler).Methods("POST")
        instRouter.HandleFunc("/instance/{instID}/config/{cfgname}/tagit", configHandler.tagitHandler).Methods("POST")
 
index b0a8f85..a4f0813 100644 (file)
@@ -117,6 +117,22 @@ func (h rbConfigHandler) listHandler(w http.ResponseWriter, r *http.Request) {
 }
 
 // deleteHandler handles DELETE operations on a config
+func (h rbConfigHandler) deleteAllHandler(w http.ResponseWriter, r *http.Request) {
+       vars := mux.Vars(r)
+       instanceID := vars["instID"]
+       cfgName := vars["cfgname"]
+
+       err := h.client.DeleteAll(instanceID, cfgName)
+       if err != nil {
+               http.Error(w, err.Error(), http.StatusInternalServerError)
+               return
+       }
+
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusAccepted)
+}
+
+// deleteHandler handles delete operations on a config creating its delete version
 func (h rbConfigHandler) deleteHandler(w http.ResponseWriter, r *http.Request) {
        vars := mux.Vars(r)
        instanceID := vars["instID"]
@@ -184,12 +200,13 @@ func (h rbConfigHandler) rollbackHandler(w http.ResponseWriter, r *http.Request)
        }
 
        var p app.ConfigRollback
-       err := json.NewDecoder(r.Body).Decode(&p)
+       err := json.NewDecoder(r.Body).Decode(&p.AnyOf)
        if err != nil {
                http.Error(w, err.Error(), http.StatusUnprocessableEntity)
                return
        }
        err = h.client.Rollback(instanceID, cfgName, p)
+       //err = h.client.Cleanup(instanceID)
        if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
index 9813333..06c4c46 100644 (file)
@@ -20,25 +20,30 @@ package app
 import (
        "context"
        "io/ioutil"
+
        appsv1 "k8s.io/api/apps/v1"
+
        //appsv1beta1 "k8s.io/api/apps/v1beta1"
        //appsv1beta2 "k8s.io/api/apps/v1beta2"
        batchv1 "k8s.io/api/batch/v1"
        corev1 "k8s.io/api/core/v1"
+
        //extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
        //apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
        //apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
-       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "os"
        "strings"
        "time"
 
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+       logger "log"
+
        "github.com/onap/multicloud-k8s/src/k8splugin/internal/config"
        "github.com/onap/multicloud-k8s/src/k8splugin/internal/connection"
        "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
        log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils"
        "github.com/onap/multicloud-k8s/src/k8splugin/internal/plugin"
-       logger "log"
 
        pkgerrors "github.com/pkg/errors"
        "k8s.io/apimachinery/pkg/api/meta"
@@ -545,9 +550,18 @@ func (k *KubernetesClient) DeleteKind(resource helm.KubernetesResource, namespac
                return pkgerrors.Wrap(err, "Error loading plugin")
        }
 
-       err = pluginImpl.Delete(resource, namespace, k)
-       if err != nil {
-               return pkgerrors.Wrap(err, "Error deleting "+resource.Name)
+       name, err := pluginImpl.Get(resource, namespace, k)
+
+       if (err == nil && name == resource.Name) || (err != nil && strings.Contains(err.Error(), "not found") == false) {
+               err = pluginImpl.Delete(resource, namespace, k)
+               if err != nil {
+                       return pkgerrors.Wrap(err, "Error deleting "+resource.Name)
+               }
+       } else {
+               log.Warn("Resource does not exist, Skipping delete", log.Fields{
+                       "gvk":      resource.GVK,
+                       "resource": resource.Name,
+               })
        }
 
        return nil
index a25ab54..8952c16 100644 (file)
 package app
 
 import (
+       "log"
        "strconv"
        "strings"
 
-       "github.com/onap/multicloud-k8s/src/k8splugin/internal/db"
+       "github.com/onap/multicloud-k8s/src/k8splugin/internal/helm"
 
        pkgerrors "github.com/pkg/errors"
 )
@@ -66,7 +67,9 @@ type ConfigManager interface {
        Help() map[string]string
        Update(instanceID, configName string, p Config) (ConfigResult, error)
        Delete(instanceID, configName string) (ConfigResult, error)
+       DeleteAll(instanceID, configName string) error
        Rollback(instanceID string, configName string, p ConfigRollback) error
+       Cleanup(instanceID string) error
        Tagit(instanceID string, configName string, p ConfigTagit) error
 }
 
@@ -94,7 +97,7 @@ func (v *ConfigClient) Help() map[string]string {
 
 // Create an entry for the config in the database
 func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error) {
-
+       log.Printf("[Config Create] Instance %s", instanceID)
        // Check required fields
        if p.ConfigName == "" || p.TemplateName == "" || len(p.Values) == 0 {
                return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided")
@@ -120,10 +123,12 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error)
        // Acquire per profile Mutex
        lock.Lock()
        defer lock.Unlock()
-       err = applyConfig(instanceID, p, profileChannel, "POST")
+       var appliedResources ([]helm.KubernetesResource)
+       appliedResources, err = applyConfig(instanceID, p, profileChannel, "POST", nil)
        if err != nil {
                return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config failed")
        }
+       log.Printf("POST result: %s", appliedResources)
        // Create Config DB Entry
        err = cs.createConfig(p)
        if err != nil {
@@ -134,7 +139,7 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error)
                instanceID: instanceID,
                configName: p.ConfigName,
        }
-       version, err := cvs.createConfigVersion(p, Config{}, "POST")
+       version, err := cvs.createConfigVersion(p, Config{}, "POST", appliedResources)
        if err != nil {
                return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry")
        }
@@ -154,7 +159,7 @@ func (v *ConfigClient) Create(instanceID string, p Config) (ConfigResult, error)
 
 // Update an entry for the config in the database
 func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigResult, error) {
-
+       log.Printf("[Config Update] Instance %s Config %s", instanceID, configName)
        // Check required fields
        if len(p.Values) == 0 {
                return ConfigResult{}, pkgerrors.New("Incomplete Configuration Provided")
@@ -177,10 +182,12 @@ func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigRe
        // Acquire per profile Mutex
        lock.Lock()
        defer lock.Unlock()
-       err = applyConfig(instanceID, p, profileChannel, "PUT")
+       var appliedResources ([]helm.KubernetesResource)
+       appliedResources, err = applyConfig(instanceID, p, profileChannel, "PUT", nil)
        if err != nil {
                return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config  failed")
        }
+       log.Printf("PUT result: %s", appliedResources)
        // Update Config DB Entry
        configPrev, err := cs.updateConfig(p)
        if err != nil {
@@ -191,7 +198,7 @@ func (v *ConfigClient) Update(instanceID, configName string, p Config) (ConfigRe
                instanceID: instanceID,
                configName: configName,
        }
-       version, err := cvs.createConfigVersion(p, configPrev, "PUT")
+       version, err := cvs.createConfigVersion(p, configPrev, "PUT", appliedResources)
        if err != nil {
                return ConfigResult{}, pkgerrors.Wrap(err, "Create Config Version DB Entry")
        }
@@ -247,8 +254,49 @@ func (v *ConfigClient) List(instanceID string) ([]Config, error) {
 }
 
 // Delete the Config from database
-func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) {
+func (v *ConfigClient) DeleteAll(instanceID, configName string) error {
+       log.Printf("[Config Delete All] Instance %s Config %s", instanceID, configName)
+       // Check if Config exists
+       cs := ConfigStore{
+               instanceID: instanceID,
+               configName: configName,
+       }
+       _, err := cs.getConfig()
+       if err != nil {
+               return pkgerrors.Wrap(err, "Update Error - Config doesn't exist")
+       }
+       // Get Version Entry in DB for Config
+       cvs := ConfigVersionStore{
+               instanceID: instanceID,
+               configName: configName,
+       }
+       currentVersion, err := cvs.getCurrentVersion(configName)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Current version get failed")
+       }
+       _, _, action, _, err := cvs.getConfigVersion(configName, currentVersion)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Config version get failed")
+       }
+
+       if action != "DELETE" {
+               _, err = v.Delete(instanceID, configName)
+               if err != nil {
+                       return pkgerrors.Wrap(err, "Config  DELETE version failed")
+               }
+       }
+       // Delete Config from DB
+       _, err = cs.deleteConfig()
+       if err != nil {
+               return pkgerrors.Wrap(err, "Delete Config DB Entry")
+       }
+       cvs.cleanupIstanceTags(configName)
+       return nil
+}
 
+// Apply update with delete operation
+func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, error) {
+       log.Printf("[Config Delete] Instance %s Config %s", instanceID, configName)
        // Resolving rbName, Version, etc. not to break response
        rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
        if err != nil {
@@ -261,29 +309,39 @@ func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, erro
        }
        p, err := cs.getConfig()
        if err != nil {
-               return ConfigResult{}, pkgerrors.Wrap(err, "Update Error - Config doesn't exist")
+               return ConfigResult{}, pkgerrors.Wrap(err, "Delete Error - Config doesn't exist")
        }
        lock, profileChannel := getProfileData(instanceID)
        // Acquire per profile Mutex
        lock.Lock()
        defer lock.Unlock()
-       err = applyConfig(instanceID, p, profileChannel, "DELETE")
-       if err != nil {
-               return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config  failed")
-       }
-       // Delete Config from DB
-       configPrev, err := cs.deleteConfig()
-       if err != nil {
-               return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config DB Entry")
-       }
        // Create Version Entry in DB for Config
        cvs := ConfigVersionStore{
                instanceID: instanceID,
                configName: configName,
        }
-       version, err := cvs.createConfigVersion(Config{}, configPrev, "DELETE")
+       currentVersion, err := cvs.getCurrentVersion(configName)
+       if err != nil {
+               return ConfigResult{}, pkgerrors.Wrap(err, "Current version get failed")
+       }
+       _, _, _, resources, err := cvs.getConfigVersion(configName, currentVersion)
+       if err != nil {
+               return ConfigResult{}, pkgerrors.Wrap(err, "Config version get failed")
+       }
+
+       _, err = applyConfig(instanceID, p, profileChannel, "DELETE", resources)
+       if err != nil {
+               return ConfigResult{}, pkgerrors.Wrap(err, "Apply Config  failed")
+       }
+       log.Printf("DELETE resources: [%s]", resources)
+       // Update Config from DB
+       configPrev, err := cs.updateConfig(p)
        if err != nil {
-               return ConfigResult{}, pkgerrors.Wrap(err, "Delete Config Version DB Entry")
+               return ConfigResult{}, pkgerrors.Wrap(err, "Update Config DB Entry")
+       }
+       version, err := cvs.createConfigVersion(p, configPrev, "DELETE", []helm.KubernetesResource{})
+       if err != nil {
+               return ConfigResult{}, pkgerrors.Wrap(err, "Create Delete Config Version DB Entry")
        }
 
        // Create Result structure
@@ -301,7 +359,7 @@ func (v *ConfigClient) Delete(instanceID, configName string) (ConfigResult, erro
 
 // Rollback starts from current version and rollbacks to the version desired
 func (v *ConfigClient) Rollback(instanceID string, configName string, rback ConfigRollback) error {
-
+       log.Printf("[Config Rollback] Instance %s Config %s", instanceID, configName)
        var reqVersion string
        var err error
 
@@ -342,40 +400,35 @@ func (v *ConfigClient) Rollback(instanceID string, configName string, rback Conf
 
        //Rollback all the intermettinent configurations
        for i := currentVersion; i > rollbackIndex; i-- {
-               configNew, configPrev, action, err := cvs.getConfigVersion(configName, i)
+               configNew, configPrev, _, resources, err := cvs.getConfigVersion(configName, i)
                if err != nil {
                        return pkgerrors.Wrap(err, "Rollback Get Config Version")
                }
+               _, _, prevAction, _, err := cvs.getConfigVersion(configName, i-1)
+               if err != nil {
+                       return pkgerrors.Wrap(err, "Rollback Get Prev Config Version")
+               }
                cs := ConfigStore{
                        instanceID: instanceID,
                        configName: configNew.ConfigName,
                }
-               if action == "PUT" {
-                       // PUT is proceeded by PUT or POST
-                       err = applyConfig(instanceID, configPrev, profileChannel, "PUT")
+               if prevAction != "DELETE" {
+                       appliedResources, err := applyConfig(instanceID, configPrev, profileChannel, prevAction, nil)
                        if err != nil {
                                return pkgerrors.Wrap(err, "Apply Config  failed")
                        }
+                       log.Printf("%s result: %s", prevAction, appliedResources)
                        _, err = cs.updateConfig(configPrev)
                        if err != nil {
                                return pkgerrors.Wrap(err, "Update Config DB Entry")
                        }
-               } else if action == "POST" {
+               } else {
                        // POST is always preceeded by Config not existing
-                       err = applyConfig(instanceID, configNew, profileChannel, "DELETE")
-                       if err != nil {
-                               return pkgerrors.Wrap(err, "Delete Config  failed")
-                       }
-                       _, err = cs.deleteConfig()
-                       if err != nil {
-                               return pkgerrors.Wrap(err, "Delete Config DB Entry")
-                       }
-               } else if action == "DELETE" {
-                       // DELETE is proceeded by PUT or POST
-                       err = applyConfig(instanceID, configPrev, profileChannel, "PUT")
+                       _, err := applyConfig(instanceID, configNew, profileChannel, prevAction, resources)
                        if err != nil {
                                return pkgerrors.Wrap(err, "Delete Config  failed")
                        }
+                       log.Printf("DELETE resources: %s", resources)
                        _, err = cs.updateConfig(configPrev)
                        if err != nil {
                                return pkgerrors.Wrap(err, "Update Config DB Entry")
@@ -394,11 +447,7 @@ func (v *ConfigClient) Rollback(instanceID string, configName string, rback Conf
 
 // Tagit tags the current version with the tag provided
 func (v *ConfigClient) Tagit(instanceID string, configName string, tag ConfigTagit) error {
-
-       rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
-       if err != nil {
-               return pkgerrors.Wrap(err, "Retrieving model info")
-       }
+       log.Printf("[Config Tag It] Instance %s Config %s", instanceID, configName)
        lock, _ := getProfileData(instanceID)
        // Acquire per profile Mutex
        lock.Lock()
@@ -408,38 +457,52 @@ func (v *ConfigClient) Tagit(instanceID string, configName string, tag ConfigTag
                instanceID: instanceID,
                configName: configName,
        }
-       currentVersion, err := cvs.getCurrentVersion(configName)
-       if err != nil {
-               return pkgerrors.Wrap(err, "Get Current Config Version ")
-       }
-       tagKey := constructKey(rbName, rbVersion, profileName, instanceID, v.tagTag, configName, tag.TagName)
-
-       err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion)))
+       err := cvs.tagCurrentVersion(configName, tag.TagName)
        if err != nil {
-               return pkgerrors.Wrap(err, "TagIt store DB")
+               return pkgerrors.Wrap(err, "Tag of current version failed")
        }
        return nil
 }
 
 // GetTagVersion returns the version associated with the tag
 func (v *ConfigClient) GetTagVersion(instanceID, configName string, tagName string) (string, error) {
-
-       rbName, rbVersion, profileName, _, err := resolveModelFromInstance(instanceID)
+       log.Printf("[Config Get Tag Version] Instance %s Config %s", instanceID, configName)
+       cvs := ConfigVersionStore{
+               instanceID: instanceID,
+               configName: configName,
+       }
+       value, err := cvs.getTagVersion(configName, tagName)
        if err != nil {
-               return "", pkgerrors.Wrap(err, "Retrieving model info")
+               return "", pkgerrors.Wrap(err, "Tag of current version failed")
        }
-       tagKey := constructKey(rbName, rbVersion, profileName, instanceID, v.tagTag, configName, tagName)
 
-       value, err := db.Etcd.Get(tagKey)
+       return value, nil
+}
+
+// Cleanup version used only when instance is being deleted. We do not pass errors and we try to delete data
+func (v *ConfigClient) Cleanup(instanceID string) error {
+       log.Printf("[Config Cleanup] Instance %s", instanceID)
+       configs, err := v.List(instanceID)
+
        if err != nil {
-               return "", pkgerrors.Wrap(err, "Config DB Entry Not found")
+               return pkgerrors.Wrap(err, "Retrieving active config list info")
+       }
+
+       for _, config := range configs {
+               err = v.DeleteAll(instanceID, config.ConfigName)
+               if err != nil {
+                       log.Printf("Config %s delete failed: %s", config.ConfigName, err.Error())
+               }
        }
-       return string(value), nil
+
+       removeProfileData(instanceID)
+
+       return nil
 }
 
 // ApplyAllConfig starts from first configuration version and applies all versions in sequence
 func (v *ConfigClient) ApplyAllConfig(instanceID string, configName string) error {
-
+       log.Printf("[Config Apply All] Instance %s Config %s", instanceID, configName)
        lock, profileChannel := getProfileData(instanceID)
        // Acquire per profile Mutex
        lock.Lock()
@@ -459,14 +522,19 @@ func (v *ConfigClient) ApplyAllConfig(instanceID string, configName string) erro
        //Apply all configurations
        var i uint
        for i = 1; i <= currentVersion; i++ {
-               configNew, _, action, err := cvs.getConfigVersion(configName, i)
+               configNew, _, action, resources, err := cvs.getConfigVersion(configName, i)
                if err != nil {
                        return pkgerrors.Wrap(err, "Get Config Version")
                }
-               err = applyConfig(instanceID, configNew, profileChannel, action)
+               if action != "DELETE" {
+                       resources = nil
+               }
+               var appliedResources ([]helm.KubernetesResource)
+               appliedResources, err = applyConfig(instanceID, configNew, profileChannel, action, resources)
                if err != nil {
                        return pkgerrors.Wrap(err, "Apply Config  failed")
                }
+               log.Printf("%s result: %s", action, appliedResources)
        }
        return nil
 }
index 3e5d8a3..4fedb38 100644 (file)
@@ -38,9 +38,10 @@ import (
 
 //ConfigStore contains the values that will be stored in the database
 type configVersionDBContent struct {
-       ConfigNew  Config `json:"config-new"`
-       ConfigPrev Config `json:"config-prev"`
-       Action     string `json:"action"` // CRUD opration for this config
+       ConfigNew  Config                    `json:"config-new"`
+       ConfigPrev Config                    `json:"config-prev"`
+       Action     string                    `json:"action"` // CRUD opration for this config
+       Resources  []helm.KubernetesResource `json:"resources"`
 }
 
 //ConfigStore to Store the Config
@@ -57,7 +58,8 @@ type ConfigVersionStore struct {
 
 type configResourceList struct {
        resourceTemplates []helm.KubernetesResourceTemplate
-       createdResources  []helm.KubernetesResource
+       resources         []helm.KubernetesResource
+       updatedResources  chan []helm.KubernetesResource
        profile           rb.Profile
        action            string
 }
@@ -72,6 +74,7 @@ const (
        storeName  = "config"
        tagCounter = "counter"
        tagVersion = "configversion"
+       tagName    = "configtag"
        tagConfig  = "configdata"
 )
 
@@ -223,8 +226,37 @@ func (c ConfigStore) deleteConfig() (Config, error) {
        return configPrev, nil
 }
 
+//Cleanup stored data in etcd before instance is being deleted
+func (c ConfigVersionStore) cleanupIstanceTags(configName string) error {
+
+       rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Retrieving model info")
+       }
+
+       versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName)
+       err = db.Etcd.DeletePrefix(versionKey)
+       if err != nil {
+               log.Printf("Deleting versions of instance failed: %s", err.Error())
+       }
+
+       counterKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName)
+       err = db.Etcd.DeletePrefix(counterKey)
+       if err != nil {
+               log.Printf("Deleting counters of instance failed: %s", err.Error())
+       }
+
+       nameKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName)
+       err = db.Etcd.DeletePrefix(nameKey)
+       if err != nil {
+               log.Printf("Deleting counters of instance failed: %s", err.Error())
+       }
+
+       return nil
+}
+
 // Create a version for the configuration. If previous config provided that is also stored
-func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string) (uint, error) {
+func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, action string, resources []helm.KubernetesResource) (uint, error) {
 
        configName := ""
        if configNew.ConfigName != "" {
@@ -249,6 +281,7 @@ func (c ConfigVersionStore) createConfigVersion(configNew, configPrev Config, ac
        cs.Action = action
        cs.ConfigNew = configNew
        cs.ConfigPrev = configPrev
+       cs.Resources = resources //[]helm.KubernetesResource{}
 
        configValue, err := db.Serialize(cs)
        if err != nil {
@@ -288,27 +321,27 @@ func (c ConfigVersionStore) deleteConfigVersion(configName string) error {
 
 // Read the specified version of the configuration and return its prev and current value.
 // Also returns the action for the config version
-func (c ConfigVersionStore) getConfigVersion(configName string, version uint) (Config, Config, string, error) {
+func (c ConfigVersionStore) getConfigVersion(configName string, version uint) (Config, Config, string, []helm.KubernetesResource, error) {
 
        rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
        if err != nil {
-               return Config{}, Config{}, "", pkgerrors.Wrap(err, "Retrieving model info")
+               return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info")
        }
        versionKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagVersion, configName, strconv.Itoa(int(version)))
        configBytes, err := db.Etcd.Get(versionKey)
        if err != nil {
-               return Config{}, Config{}, "", pkgerrors.Wrap(err, "Get Config Version ")
+               return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Get Config Version ")
        }
 
        if configBytes != nil {
                pr := configVersionDBContent{}
                err = db.DeSerialize(string(configBytes), &pr)
                if err != nil {
-                       return Config{}, Config{}, "", pkgerrors.Wrap(err, "DeSerialize Config Version")
+                       return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "DeSerialize Config Version")
                }
-               return pr.ConfigNew, pr.ConfigPrev, pr.Action, nil
+               return pr.ConfigNew, pr.ConfigPrev, pr.Action, pr.Resources, nil
        }
-       return Config{}, Config{}, "", pkgerrors.Wrap(err, "Invalid data ")
+       return Config{}, Config{}, "", []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Invalid data ")
 }
 
 // Get the counter for the version
@@ -318,7 +351,7 @@ func (c ConfigVersionStore) getCurrentVersion(configName string) (uint, error) {
        if err != nil {
                return 0, pkgerrors.Wrap(err, "Retrieving model info")
        }
-       cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, configName, tagCounter)
+       cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName)
 
        value, err := db.Etcd.Get(cfgKey)
        if err != nil {
@@ -344,7 +377,7 @@ func (c ConfigVersionStore) updateVersion(configName string, counter uint) error
        if err != nil {
                return pkgerrors.Wrap(err, "Retrieving model info")
        }
-       cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, configName, tagCounter)
+       cfgKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagCounter, configName)
        err = db.Etcd.Put(cfgKey, strconv.Itoa(int(counter)))
        if err != nil {
                return pkgerrors.Wrap(err, "Counter DB Entry")
@@ -386,45 +419,87 @@ func (c ConfigVersionStore) decrementVersion(configName string) error {
        return nil
 }
 
+// Get tag version
+func (c ConfigVersionStore) getTagVersion(configName, tagNameValue string) (string, error) {
+       rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
+       if err != nil {
+               return "", pkgerrors.Wrap(err, "Retrieving model info")
+       }
+       tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue)
+
+       value, err := db.Etcd.Get(tagKey)
+       if err != nil {
+               return "", pkgerrors.Wrap(err, "Config DB Entry Not found")
+       }
+       return string(value), nil
+}
+
+// Tag current version
+func (c ConfigVersionStore) tagCurrentVersion(configName, tagNameValue string) error {
+       currentVersion, err := c.getCurrentVersion(configName)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Get Current Config Version ")
+       }
+       rbName, rbVersion, profileName, _, err := resolveModelFromInstance(c.instanceID)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Retrieving model info")
+       }
+       tagKey := constructKey(rbName, rbVersion, profileName, c.instanceID, tagName, configName, tagNameValue)
+
+       err = db.Etcd.Put(tagKey, strconv.Itoa(int(currentVersion)))
+       if err != nil {
+               return pkgerrors.Wrap(err, "TagIt store DB")
+       }
+       return nil
+}
+
 // Apply Config
-func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string) error {
+func applyConfig(instanceID string, p Config, pChannel chan configResourceList, action string, resources []helm.KubernetesResource) ([]helm.KubernetesResource, error) {
 
        rbName, rbVersion, profileName, releaseName, err := resolveModelFromInstance(instanceID)
        if err != nil {
-               return pkgerrors.Wrap(err, "Retrieving model info")
+               return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Retrieving model info")
        }
        // Get Template and Resolve the template with values
        crl, err := resolve(rbName, rbVersion, profileName, p, releaseName)
        if err != nil {
-               return pkgerrors.Wrap(err, "Resolve Config")
+               return []helm.KubernetesResource{}, pkgerrors.Wrap(err, "Resolve Config")
        }
+       var updatedResources (chan []helm.KubernetesResource) = make(chan []helm.KubernetesResource)
        crl.action = action
+       crl.resources = resources
+       crl.updatedResources = updatedResources
        // Send the configResourceList to the channel. Using select for non-blocking channel
+       log.Printf("Before Sent to goroutine %v", crl.profile)
        select {
        case pChannel <- crl:
                log.Printf("Message Sent to goroutine %v", crl.profile)
        default:
        }
 
-       return nil
+       var resultResources []helm.KubernetesResource = <-updatedResources
+       return resultResources, nil
 }
 
 // Per Profile Go routine to apply the configuration to Cloud Region
 func scheduleResources(c chan configResourceList) {
        // Keep thread running
+       log.Printf("[scheduleResources]: START thread")
        for {
                data := <-c
                //TODO: ADD Check to see if Application running
                ic := NewInstanceClient()
                resp, err := ic.Find(data.profile.RBName, data.profile.RBVersion, data.profile.ProfileName, nil)
-               if err != nil || len(resp) == 0 {
+               if (err != nil || len(resp) == 0) && data.action != "STOP" {
                        log.Println("Error finding a running instance. Retrying later...")
-                       time.Sleep(time.Second * 10)
+                       data.updatedResources <- []helm.KubernetesResource{}
                        continue
                }
+               breakThread := false
                switch {
                case data.action == "POST":
                        log.Printf("[scheduleResources]: POST %v %v", data.profile, data.resourceTemplates)
+                       var resources []helm.KubernetesResource
                        for _, inst := range resp {
                                k8sClient := KubernetesClient{}
                                err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
@@ -434,11 +509,11 @@ func scheduleResources(c chan configResourceList) {
                                        continue
                                }
                                //assuming - the resource is not exist already
-                               data.createdResources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace)
+                               resources, err = k8sClient.createResources(data.resourceTemplates, inst.Namespace)
                                errCreate := err
                                if err != nil {
                                        // assuming - the err represent the resource is already exist, so going for update
-                                       data.createdResources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
+                                       resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
                                        if err != nil {
                                                log.Printf("Error Creating resources: %s", errCreate.Error())
                                                log.Printf("Error Updating resources: %s", err.Error())
@@ -446,8 +521,10 @@ func scheduleResources(c chan configResourceList) {
                                        }
                                }
                        }
+                       data.updatedResources <- resources
                case data.action == "PUT":
                        log.Printf("[scheduleResources]: PUT %v %v", data.profile, data.resourceTemplates)
+                       var resources []helm.KubernetesResource
                        for _, inst := range resp {
                                k8sClient := KubernetesClient{}
                                err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
@@ -456,14 +533,16 @@ func scheduleResources(c chan configResourceList) {
                                        //Move onto the next cloud region
                                        continue
                                }
-                               data.createdResources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
+
+                               resources, err = k8sClient.updateResources(data.resourceTemplates, inst.Namespace)
                                if err != nil {
                                        log.Printf("Error Updating resources: %s", err.Error())
                                        continue
                                }
                        }
+                       data.updatedResources <- resources
                case data.action == "DELETE":
-                       log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resourceTemplates)
+                       log.Printf("[scheduleResources]: DELETE %v %v", data.profile, data.resources)
                        for _, inst := range resp {
                                k8sClient := KubernetesClient{}
                                err = k8sClient.Init(inst.Request.CloudRegion, inst.ID)
@@ -472,14 +551,22 @@ func scheduleResources(c chan configResourceList) {
                                        //Move onto the next cloud region
                                        continue
                                }
-                               err = k8sClient.deleteResources(data.createdResources, inst.Namespace)
+                               err = k8sClient.deleteResources(data.resources, inst.Namespace)
                                if err != nil {
                                        log.Printf("Error Deleting resources: %s", err.Error())
                                        continue
                                }
                        }
+                       data.updatedResources <- []helm.KubernetesResource{}
+
+               case data.action == "STOP":
+                       breakThread = true
+               }
+               if breakThread {
+                       break
                }
        }
+       log.Printf("[scheduleResources]: STOP thread")
 }
 
 //Resolve returns the path where the helm chart merged with
@@ -570,6 +657,25 @@ func getProfileData(key string) (*sync.Mutex, chan configResourceList) {
        if !ok {
                profileData.resourceChannel[key] = make(chan configResourceList)
                go scheduleResources(profileData.resourceChannel[key])
+               time.Sleep(time.Second * 5)
        }
        return profileData.profileLockMap[key], profileData.resourceChannel[key]
 }
+
+func removeProfileData(key string) {
+       profileData.Lock()
+       defer profileData.Unlock()
+       _, ok := profileData.profileLockMap[key]
+       if ok {
+               delete(profileData.profileLockMap, key)
+       }
+       _, ok = profileData.resourceChannel[key]
+       if ok {
+               log.Printf("Stop config thread for %s", key)
+               crl := configResourceList{
+                       action: "STOP",
+               }
+               profileData.resourceChannel[key] <- crl
+               delete(profileData.resourceChannel, key)
+       }
+}
index dc4779d..0cc3c3c 100644 (file)
@@ -319,3 +319,6 @@ func TestRollbackConfig(t *testing.T) {
                })
        }
 }
+
+func main() {
+}
index 0f1f3d7..b7f382a 100644 (file)
@@ -709,7 +709,8 @@ func (v *InstanceClient) Delete(id string) error {
                return nil
        } else if inst.Status != "DONE" {
                //Recover is ongoing, do nothing here
-               return nil
+               //return nil
+               //TODO: implement recovery
        }
 
        k8sClient := KubernetesClient{}
@@ -718,12 +719,6 @@ func (v *InstanceClient) Delete(id string) error {
                return pkgerrors.Wrap(err, "Getting CloudRegion Information")
        }
 
-       configClient := NewConfigClient()
-       configs, err := configClient.List(id)
-       if err != nil {
-               return pkgerrors.Wrap(err, "Getting Configs Information")
-       }
-
        inst.Status = "PRE-DELETE"
        inst.HookProgress = ""
        err = db.DBconn.Update(v.storeName, key, v.tagInst, inst)
@@ -751,15 +746,10 @@ func (v *InstanceClient) Delete(id string) error {
                log.Printf("Update Instance DB Entry for release %s has error.", inst.ReleaseName)
        }
 
-       if len(configs) > 0 {
-               log.Printf("Deleting config resources first")
-               for _, config := range configs {
-                       log.Printf("Deleting Config %s Resources", config.ConfigName)
-                       _, err = configClient.Delete(id, config.ConfigName)
-                       if err != nil {
-                               return pkgerrors.Wrap(err, "Deleting Config Resources")
-                       }
-               }
+       configClient := NewConfigClient()
+       err = configClient.Cleanup(id)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Cleanup Config Resources")
        }
 
        err = k8sClient.deleteResources(inst.Resources, inst.Namespace)
index a435b43..e455cc1 100644 (file)
@@ -39,6 +39,7 @@ type EtcdStore interface {
        GetAll(key string) ([][]byte, error)
        Put(key, value string) error
        Delete(key string) error
+       DeletePrefix(keyPrefix string) error
 }
 
 // EtcdClient for Etcd
@@ -151,3 +152,16 @@ func (e EtcdClient) Delete(key string) error {
        }
        return nil
 }
+
+// Delete values by prefix from Etcd DB
+func (e EtcdClient) DeletePrefix(keyPrefix string) error {
+
+       if e.cli == nil {
+               return pkgerrors.Errorf("Etcd Client not initialized")
+       }
+       _, err := e.cli.Delete(context.Background(), keyPrefix, clientv3.WithPrefix())
+       if err != nil {
+               return pkgerrors.Errorf("Delete prefix failed etcd entry:%s", err.Error())
+       }
+       return nil
+}
index 9dfcad8..4b4dfe3 100644 (file)
@@ -55,3 +55,12 @@ func (c *MockEtcdClient) Delete(key string) error {
        delete(c.Items, key)
        return c.Err
 }
+
+func (c *MockEtcdClient) DeletePrefix(key string) error {
+       for kvKey := range c.Items {
+               if strings.HasPrefix(kvKey, key) {
+                       delete(c.Items, key)
+               }
+       }
+       return c.Err
+}
index f71c436..5815b74 100644 (file)
@@ -22,6 +22,7 @@ import (
 
        appsv1 "k8s.io/api/apps/v1"
        "k8s.io/client-go/kubernetes"
+
        //appsv1beta1 "k8s.io/api/apps/v1beta1"
        //appsv1beta2 "k8s.io/api/apps/v1beta2"
        batchv1 "k8s.io/api/batch/v1"
@@ -340,58 +341,58 @@ func (g genericPlugin) Create(yamlFilePath string, namespace string, client plug
 
 // Update deployment object in a specific Kubernetes cluster
 func (g genericPlugin) Update(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) {
-        if namespace == "" {
-                namespace = "default"
-        }
-
-        //Decode the yaml file to create a runtime.Object
-        unstruct := &unstructured.Unstructured{}
-        //Ignore the returned obj as we expect the data in unstruct
-        _, err := utils.DecodeYAML(yamlFilePath, unstruct)
-        if err != nil {
-                return "", pkgerrors.Wrap(err, "Decode deployment object error")
-        }
-
-        dynClient := client.GetDynamicClient()
-        mapper := client.GetMapper()
-
-        gvk := unstruct.GroupVersionKind()
-        mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
-        if err != nil {
-                return "", pkgerrors.Wrap(err, "Mapping kind to resource error")
-        }
-
-        //Add the tracking label to all resources created here
-        labels := unstruct.GetLabels()
-        //Check if labels exist for this object
-        if labels == nil {
-                labels = map[string]string{}
-        }
-        labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
-        unstruct.SetLabels(labels)
-
-        // This checks if the resource we are creating has a podSpec in it
-        // Eg: Deployment, StatefulSet, Job etc..
-        // If a PodSpec is found, the label will be added to it too.
-        plugin.TagPodsIfPresent(unstruct, client.GetInstanceID())
-
-        gvr := mapping.Resource
-        var updatedObj *unstructured.Unstructured
-
-        switch mapping.Scope.Name() {
-        case meta.RESTScopeNameNamespace:
-                updatedObj, err = dynClient.Resource(gvr).Namespace(namespace).Update(context.TODO(), unstruct, metav1.UpdateOptions{})
-        case meta.RESTScopeNameRoot:
-                updatedObj, err = dynClient.Resource(gvr).Update(context.TODO(), unstruct, metav1.UpdateOptions{})
-        default:
-                return "", pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + gvk.String())
-        }
-
-        if err != nil {
-                return "", pkgerrors.Wrap(err, "Update object error")
-        }
-
-        return updatedObj.GetName(), nil
+       if namespace == "" {
+               namespace = "default"
+       }
+
+       //Decode the yaml file to create a runtime.Object
+       unstruct := &unstructured.Unstructured{}
+       //Ignore the returned obj as we expect the data in unstruct
+       _, err := utils.DecodeYAML(yamlFilePath, unstruct)
+       if err != nil {
+               return "", pkgerrors.Wrap(err, "Decode deployment object error")
+       }
+
+       dynClient := client.GetDynamicClient()
+       mapper := client.GetMapper()
+
+       gvk := unstruct.GroupVersionKind()
+       mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
+       if err != nil {
+               return "", pkgerrors.Wrap(err, "Mapping kind to resource error")
+       }
+
+       //Add the tracking label to all resources created here
+       labels := unstruct.GetLabels()
+       //Check if labels exist for this object
+       if labels == nil {
+               labels = map[string]string{}
+       }
+       labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+       unstruct.SetLabels(labels)
+
+       // This checks if the resource we are creating has a podSpec in it
+       // Eg: Deployment, StatefulSet, Job etc..
+       // If a PodSpec is found, the label will be added to it too.
+       plugin.TagPodsIfPresent(unstruct, client.GetInstanceID())
+
+       gvr := mapping.Resource
+       var updatedObj *unstructured.Unstructured
+
+       switch mapping.Scope.Name() {
+       case meta.RESTScopeNameNamespace:
+               updatedObj, err = dynClient.Resource(gvr).Namespace(namespace).Update(context.TODO(), unstruct, metav1.UpdateOptions{})
+       case meta.RESTScopeNameRoot:
+               updatedObj, err = dynClient.Resource(gvr).Update(context.TODO(), unstruct, metav1.UpdateOptions{})
+       default:
+               return "", pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + gvk.String())
+       }
+
+       if err != nil {
+               return "", pkgerrors.Wrap(err, "Update object error")
+       }
+
+       return updatedObj.GetName(), nil
 }
 
 // Get an existing resource hosted in a specific Kubernetes cluster
@@ -425,7 +426,7 @@ func (g genericPlugin) Get(resource helm.KubernetesResource,
        }
 
        if err != nil {
-               return "", pkgerrors.Wrap(err, "Delete object error")
+               return "", pkgerrors.Wrap(err, "Get object error")
        }
 
        return unstruct.GetName(), nil
index 8732442..6c6d1f6 100644 (file)
@@ -21,10 +21,10 @@ import (
 
        pkgerrors "github.com/pkg/errors"
        coreV1 "k8s.io/api/core/v1"
-       metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-       "k8s.io/apimachinery/pkg/runtime/schema"
        "k8s.io/apimachinery/pkg/api/meta"
+       metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/runtime/schema"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/rest"
 
@@ -60,7 +60,12 @@ func (p namespacePlugin) Create(yamlFilePath string, namespace string, client pl
                        Name: namespace,
                },
        }
-       _, err := client.GetStandardClient().CoreV1().Namespaces().Create(context.TODO(), namespaceObj, metaV1.CreateOptions{})
+       existingNs, err := client.GetStandardClient().CoreV1().Namespaces().Get(context.TODO(), namespace, metaV1.GetOptions{})
+       if err == nil && len(existingNs.ManagedFields) > 0 && existingNs.ManagedFields[0].Manager == "k8plugin" {
+               log.Printf("Namespace (%s) already ensured by plugin. Skip", namespace)
+               return namespace, nil
+       }
+       _, err = client.GetStandardClient().CoreV1().Namespaces().Create(context.TODO(), namespaceObj, metaV1.CreateOptions{})
        if err != nil {
                return "", pkgerrors.Wrap(err, "Create Namespace error")
        }
@@ -128,5 +133,5 @@ func (p namespacePlugin) List(gvk schema.GroupVersionKind, namespace string, cli
 
 func (p namespacePlugin) Update(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) {
 
-   return "", nil
+       return namespace, nil
 }
index aa5c685..52dd459 100644 (file)
@@ -21,10 +21,10 @@ import (
 
        pkgerrors "github.com/pkg/errors"
        coreV1 "k8s.io/api/core/v1"
-       metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
-       "k8s.io/apimachinery/pkg/runtime/schema"
        "k8s.io/apimachinery/pkg/api/meta"
+       metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/apimachinery/pkg/runtime/schema"
        "k8s.io/client-go/kubernetes"
        "k8s.io/client-go/rest"
 
@@ -156,8 +156,43 @@ func (p servicePlugin) Get(resource helm.KubernetesResource, namespace string, c
        return service.Name, nil
 }
 
+// Update a service object in a specific Kubernetes cluster
 func (p servicePlugin) Update(yamlFilePath string, namespace string, client plugin.KubernetesConnector) (string, error) {
+       if namespace == "" {
+               namespace = "default"
+       }
 
-        return "", nil
+       obj, err := utils.DecodeYAML(yamlFilePath, nil)
+       if err != nil {
+               return "", pkgerrors.Wrap(err, "Decode service object error")
+       }
 
+       service, ok := obj.(*coreV1.Service)
+       if !ok {
+               return "", pkgerrors.New("Decoded object contains another resource different than Service")
+       }
+       service.Namespace = namespace
+
+       existingService, err := client.GetStandardClient().CoreV1().Services(namespace).Get(context.TODO(), service.Name, metaV1.GetOptions{})
+       if err == nil {
+               service.ResourceVersion = existingService.ResourceVersion
+               service.Spec.ClusterIP = existingService.Spec.ClusterIP
+       } else {
+               return p.Create(yamlFilePath, namespace, client)
+       }
+       labels := service.GetLabels()
+       //Check if labels exist for this object
+       if labels == nil {
+               labels = map[string]string{}
+       }
+       labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+       service.SetLabels(labels)
+
+       _, err = client.GetStandardClient().CoreV1().Services(namespace).Update(context.TODO(), service, metaV1.UpdateOptions{})
+
+       if err != nil {
+               return "", pkgerrors.Wrap(err, "Update object error")
+       }
+
+       return service.Name, nil
 }