Instantiation and termination of a given context implementation. 69/109369/8
authorManjunath Ranganathaiah <manjunath.ranganathaiah@intel.com>
Fri, 19 Jun 2020 17:54:58 +0000 (17:54 +0000)
committerManjunath Ranganathaiah <manjunath.ranganathaiah@intel.com>
Wed, 24 Jun 2020 22:58:20 +0000 (22:58 +0000)
Issue-ID: MULTICLOUD-1005
Signed-off-by: Manjunath Ranganathaiah <manjunath.ranganathaiah@intel.com>
Change-Id: I60e11aaad97b60efc24a02866dc0e580507e5296

src/rsync/cmd/main.go
src/rsync/go.mod
src/rsync/go.sum
src/rsync/pkg/app/client.go [new file with mode: 0644]
src/rsync/pkg/connector/connector.go [new file with mode: 0644]
src/rsync/pkg/context/context.go [new file with mode: 0644]
src/rsync/pkg/grpc/installappserver/installappserver.go
src/rsync/pkg/internal/config/config.go [new file with mode: 0644]
src/rsync/pkg/internal/utils.go [new file with mode: 0644]
src/rsync/pkg/resource/resource.go [new file with mode: 0644]

index f46fa79..95c36e2 100644 (file)
@@ -81,7 +81,6 @@ func main() {
        // Initialize the mongodb
        err := db.InitializeDatabaseConnection("mco")
        if err != nil {
-               fmt.Println(" Exiting mongod ")
                log.Println("Unable to initialize database connection...")
                log.Println(err)
                log.Fatalln("Exiting...")
@@ -90,14 +89,13 @@ func main() {
        // Initialize contextdb
        err = contextDb.InitializeContextDatabase()
        if err != nil {
-               fmt.Println(" Exiting etcd")
                log.Println("Unable to initialize database connection...")
                log.Println(err)
                log.Fatalln("Exiting...")
        }
 
        // Start grpc
-       fmt.Println("starting rsync GRPC server..")
+       log.Println("starting rsync GRPC server..")
        err = startGrpcServer()
        if err != nil {
                log.Fatalf("GRPC server failed to start")
index 9c3362c..a2c5f83 100644 (file)
@@ -3,13 +3,15 @@ module github.com/onap/multicloud-k8s/src/rsync
 go 1.13
 
 require (
-       github.com/golang/protobuf v1.3.4
+       github.com/golang/protobuf v1.4.1
+       github.com/googleapis/gnostic v0.4.0
+       github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
+       github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
+       github.com/mattn/go-isatty v0.0.4 // indirect
+       github.com/modern-go/reflect2 v1.0.1 // indirect
        github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4
-       golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect
-       google.golang.org/appengine v1.4.0 // indirect
+       go.etcd.io/bbolt v1.3.3 // indirect
        google.golang.org/grpc v1.27.1
-       honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect
-       github.com/googleapis/gnostic v0.4.0
        k8s.io/kubernetes v1.14.1
 )
 
index 270417c..c49c5b0 100644 (file)
@@ -742,6 +742,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2El
 google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
 google.golang.org/genproto v0.0.0-20200305110556-506484158171 h1:xes2Q2k+d/+YNXVw0FpZkIDJiaux4OVrRKXRAzH6A0U=
 google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
+google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
+google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
 google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
 google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8=
 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
@@ -765,6 +767,9 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
 google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
 google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2MY=
 google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.24.0 h1:UhZDfRO8JRQru4/+LlLE0BRKGF8L+PICnvYZmx/fEGA=
+google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/src/rsync/pkg/app/client.go b/src/rsync/pkg/app/client.go
new file mode 100644 (file)
index 0000000..fb57d46
--- /dev/null
@@ -0,0 +1,154 @@
+/*
+Copyright 2018 Intel Corporation.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package app
+
+import (
+       "os"
+       "strings"
+       "time"
+       "encoding/base64"
+
+       pkgerrors "github.com/pkg/errors"
+       "k8s.io/apimachinery/pkg/api/meta"
+       "k8s.io/client-go/discovery/cached/disk"
+       "k8s.io/client-go/dynamic"
+       "k8s.io/client-go/kubernetes"
+       "k8s.io/client-go/restmapper"
+       "k8s.io/client-go/tools/clientcmd"
+
+       "github.com/onap/multicloud-k8s/src/clm/pkg/cluster"
+)
+
+const basePath string = "/tmp/rsync/"
+
+// KubernetesClient encapsulates the different clients' interfaces
+// we need when interacting with a Kubernetes cluster
+type KubernetesClient struct {
+       clientSet      kubernetes.Interface
+       dynamicClient  dynamic.Interface
+       discoverClient *disk.CachedDiscoveryClient
+       restMapper     meta.RESTMapper
+       instanceID     string
+}
+
+// getKubeConfig uses the connectivity client to get the kubeconfig based on the name
+// of the clustername. This is written out to a file.
+func (k *KubernetesClient) getKubeConfig(clustername string, id string) (string, error) {
+
+       if !strings.Contains(clustername, "+") {
+               return "", pkgerrors.New("Not a valid cluster name")
+       }
+       strs := strings.Split(clustername, "+")
+       if len(strs) != 2 {
+               return "", pkgerrors.New("Not a valid cluster name")
+       }
+       kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1])
+       if err != nil {
+               return "", pkgerrors.New("Get kubeconfig failed")
+       }
+
+       var kubeConfigPath string = basePath + id + "/" + clustername + "/"
+
+       if _, err := os.Stat(kubeConfigPath); os.IsNotExist(err) {
+               err = os.MkdirAll(kubeConfigPath, 0755)
+                       if err != nil {
+                               return "", err
+                       }
+       }
+       kubeConfigPath = kubeConfigPath + "config"
+
+       f, err := os.Create(kubeConfigPath)
+       defer f.Close()
+       if err != nil {
+               return "", err
+       }
+       dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig)
+       if err != nil {
+               return "", err
+       }
+       _, err = f.Write(dec)
+       if err != nil {
+               return "", err
+       }
+
+       return kubeConfigPath, nil
+}
+
+// init loads the Kubernetes configuation values stored into the local configuration file
+func (k *KubernetesClient) Init(clustername string, iid string) error {
+       if clustername == "" {
+               return pkgerrors.New("Cloudregion is empty")
+       }
+
+       if iid == "" {
+               return pkgerrors.New("Instance ID is empty")
+       }
+
+       k.instanceID = iid
+
+       configPath, err := k.getKubeConfig(clustername, iid)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Get kubeconfig file")
+       }
+
+       //Remove kubeconfigfile after the clients are created
+       defer os.Remove(configPath)
+
+       config, err := clientcmd.BuildConfigFromFlags("", configPath)
+       if err != nil {
+               return pkgerrors.Wrap(err, "setConfig: Build config from flags raised an error")
+       }
+
+       k.clientSet, err = kubernetes.NewForConfig(config)
+       if err != nil {
+               return err
+       }
+
+       k.dynamicClient, err = dynamic.NewForConfig(config)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Creating dynamic client")
+       }
+
+       k.discoverClient, err = disk.NewCachedDiscoveryClientForConfig(config, os.TempDir(), "", 10*time.Minute)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Creating discovery client")
+       }
+
+       k.restMapper = restmapper.NewDeferredDiscoveryRESTMapper(k.discoverClient)
+
+       return nil
+}
+
+//GetMapper returns the RESTMapper that was created for this client
+func (k *KubernetesClient) GetMapper() meta.RESTMapper {
+       return k.restMapper
+}
+
+//GetDynamicClient returns the dynamic client that is needed for
+//unstructured REST calls to the apiserver
+func (k *KubernetesClient) GetDynamicClient() dynamic.Interface {
+       return k.dynamicClient
+}
+
+// GetStandardClient returns the standard client that can be used to handle
+// standard kubernetes kinds
+func (k *KubernetesClient) GetStandardClient() kubernetes.Interface {
+       return k.clientSet
+}
+
+//GetInstanceID returns the instanceID that is injected into all the
+//resources created by the plugin
+func (k *KubernetesClient) GetInstanceID() string {
+       return k.instanceID
+}
diff --git a/src/rsync/pkg/connector/connector.go b/src/rsync/pkg/connector/connector.go
new file mode 100644 (file)
index 0000000..6e17f87
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2019 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package connector
+
+import (
+       "log"
+
+       "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config"
+
+       corev1 "k8s.io/api/core/v1"
+       "k8s.io/apimachinery/pkg/api/meta"
+       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/client-go/dynamic"
+       "k8s.io/client-go/kubernetes"
+)
+
+// KubernetesConnector is an interface that is expected to be implemented
+// by any code that calls the plugin framework functions.
+// It implements methods that are needed by the plugins to get Kubernetes
+// clients and other information needed to interface with Kubernetes
+type KubernetesConnector interface {
+       //GetMapper returns the RESTMapper that was created for this client
+       GetMapper() meta.RESTMapper
+
+       //GetDynamicClient returns the dynamic client that is needed for
+       //unstructured REST calls to the apiserver
+       GetDynamicClient() dynamic.Interface
+
+       // GetStandardClient returns the standard client that can be used to handle
+       // standard kubernetes kinds
+       GetStandardClient() kubernetes.Interface
+
+       //GetInstanceID returns the InstanceID for tracking during creation
+       GetInstanceID() string
+}
+
+// Reference is the interface that is implemented
+type Reference interface {
+       //Create a kubernetes resource described by the yaml in yamlFilePath
+       Create(yamlFilePath string, namespace string, client KubernetesConnector) (string, error)
+       //Delete a kubernetes resource described in the provided namespace
+       Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error
+}
+
+// TagPodsIfPresent finds the PodTemplateSpec from any workload
+// object that contains it and changes the spec to include the tag label
+func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) {
+
+       spec, ok := unstruct.Object["spec"].(map[string]interface{})
+       if !ok {
+               log.Println("Error converting spec to map")
+               return
+       }
+
+       template, ok := spec["template"].(map[string]interface{})
+       if !ok {
+               log.Println("Error converting template to map")
+               return
+       }
+
+       //Attempt to convert the template to a podtemplatespec.
+       //This is to check if we have any pods being created.
+       podTemplateSpec := &corev1.PodTemplateSpec{}
+       err := runtime.DefaultUnstructuredConverter.FromUnstructured(template, podTemplateSpec)
+       if err != nil {
+               log.Println("Did not find a podTemplateSpec: " + err.Error())
+               return
+       }
+
+       labels := podTemplateSpec.GetLabels()
+       if labels == nil {
+               labels = map[string]string{}
+       }
+       labels[config.GetConfiguration().KubernetesLabelName] = tag
+       podTemplateSpec.SetLabels(labels)
+
+       updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec)
+
+       //Set the label
+       spec["template"] = updatedTemplate
+}
diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go
new file mode 100644 (file)
index 0000000..67e589c
--- /dev/null
@@ -0,0 +1,547 @@
+/*
+ * Copyright 2020 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package context
+
+import (
+       "encoding/json"
+        "fmt"
+        "log"
+        "os"
+        "sync"
+        "strings"
+        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
+        pkgerrors "github.com/pkg/errors"
+        res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource"
+        con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+        "github.com/onap/multicloud-k8s/src/rsync/pkg/app"
+)
+
+type CompositeAppContext struct {
+       cid    interface{}
+       appsorder      string
+       appsdependency string
+       appsmap []instMap
+}
+type clusterInfo struct {
+       name   string
+       resorder      string
+       resdependency string
+       ressmap []instMap
+}
+type instMap struct {
+       name    string
+       depinfo string
+       status  string
+       rerr    error
+       clusters []clusterInfo
+}
+
+const basePath string = "/tmp/rsync/"
+
+func getInstMap(order string, dependency string, level string) ([]instMap, error) {
+
+        if order == "" {
+              return nil, pkgerrors.Errorf("Not a valid order value")
+        }
+        if dependency == "" {
+              return nil, pkgerrors.Errorf("Not a valid dependency value")
+        }
+
+        if !(level == "app" || level == "res") {
+              return nil, pkgerrors.Errorf("Not a valid level name given to create map")
+        }
+
+
+        var aov map[string]interface{}
+        json.Unmarshal([]byte(order), &aov)
+
+        s := fmt.Sprintf("%vorder", level)
+        appso := aov[s].([]interface{})
+        var instmap = make([]instMap, len(appso))
+
+        var adv map[string]interface{}
+        json.Unmarshal([]byte(dependency), &adv)
+        s = fmt.Sprintf("%vdependency", level)
+        appsd := adv[s].(map[string]interface{})
+        for i, u := range appso {
+                instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil}
+        }
+
+        return instmap, nil
+}
+
+func deleteResource(clustername string, resname string, respath string) error {
+       k8sClient := app.KubernetesClient{}
+       err := k8sClient.Init(clustername, resname)
+       if err != nil {
+               log.Println("Init failed: " + err.Error())
+               return err
+       }
+
+       var c con.KubernetesConnector
+       c = &k8sClient
+       var gp res.Resource
+       err = gp.Delete(respath, resname, "default", c)
+       if err != nil {
+               log.Println("Delete resource failed: " +  err.Error())
+               return err
+       }
+       log.Println("Resource succesfully deleted")
+       return nil
+
+}
+
+func createResource(clustername string, resname string, respath string) error {
+       k8sClient := app.KubernetesClient{}
+       err := k8sClient.Init(clustername, resname)
+       if err != nil {
+               log.Println("Client init failed: " + err.Error())
+               return err
+       }
+
+       var c con.KubernetesConnector
+       c = &k8sClient
+       var gp res.Resource
+       _, err = gp.Create(respath,"default", c)
+       if err != nil {
+               log.Println("Create failed: " +  err.Error())
+               return err
+       }
+       log.Println("Resource succesfully created")
+       return nil
+
+}
+
+func terminateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error {
+       var resPath string = basePath + appname + "/" + clustername + "/resources/"
+       rh, err := ac.GetResourceHandle(appname, clustername, resmap.name)
+       if err != nil {
+               return err
+       }
+
+       resval, err := ac.GetValue(rh)
+       if err != nil {
+               return err
+       }
+
+       if resval != "" {
+               if _, err := os.Stat(resPath); os.IsNotExist(err) {
+                       err = os.MkdirAll(resPath, 0755)
+                       if err != nil {
+                               return err
+                       }
+               }
+               resPath := resPath + resmap.name + ".yaml"
+               f, err := os.Create(resPath)
+               defer f.Close()
+               if err != nil {
+                       return err
+               }
+               _, err = f.WriteString(resval.(string))
+               if err != nil {
+                       return err
+               }
+               result := strings.Split(resmap.name, "+")
+               if result[0] == "" {
+                       return pkgerrors.Errorf("Resource name is nil")
+               }
+               err = deleteResource(clustername, result[0], resPath)
+               if err != nil {
+                       return err
+               }
+               //defer os.Remove(resPath)
+       } else {
+               return pkgerrors.Errorf("Resource value is nil")
+       }
+
+       return nil
+
+}
+
+func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error {
+       var resPath string = basePath + appname + "/" + clustername + "/resources/"
+       rh, err := ac.GetResourceHandle(appname, clustername, resmap.name)
+       if err != nil {
+               return err
+       }
+
+       resval, err := ac.GetValue(rh)
+       if err != nil {
+               return err
+       }
+
+       if resval != "" {
+               if _, err := os.Stat(resPath); os.IsNotExist(err) {
+                       err = os.MkdirAll(resPath, 0755)
+                       if err != nil {
+                               return err
+                       }
+               }
+               resPath := resPath + resmap.name + ".yaml"
+               f, err := os.Create(resPath)
+               defer f.Close()
+               if err != nil {
+                       return err
+               }
+               _, err = f.WriteString(resval.(string))
+               if err != nil {
+                       return err
+               }
+               result := strings.Split(resmap.name, "+")
+               if result[0] == "" {
+                       return pkgerrors.Errorf("Resource name is nil")
+               }
+               err = createResource(clustername, result[0], resPath)
+               if err != nil {
+                       return err
+               }
+               //defer os.Remove(resPath)
+       } else {
+               return pkgerrors.Errorf("Resource value is nil")
+       }
+
+       return nil
+
+}
+
+func  terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
+        var wg sync.WaitGroup
+        var chans = make([]chan int, len(ressmap))
+        for l := range chans {
+                chans[l] = make(chan int)
+        }
+        for i:=0; i<len(ressmap); i++ {
+                wg.Add(1)
+                go func(index int) {
+                        if ressmap[index].depinfo == "go" {
+                                ressmap[index].status = "start"
+                        } else {
+                                ressmap[index].status = "waiting"
+                                c := <- chans[index]
+                                if c != index {
+                                       ressmap[index].status = "error"
+                                        ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
+                                       wg.Done()
+                                       return
+                                }
+                                ressmap[index].status = "start"
+                        }
+                        ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername)
+                        ressmap[index].status = "done"
+                        waitstr := fmt.Sprintf("wait on %v",ressmap[index].name)
+                        for j:=0; j<len(ressmap); j++ {
+                                if ressmap[j].depinfo == waitstr {
+                                        chans[j] <- j
+                                }
+                        }
+                        wg.Done()
+                }(i)
+        }
+        wg.Wait()
+        for k:=0; k<len(ressmap); k++ {
+                if ressmap[k].rerr != nil {
+                        return pkgerrors.Errorf("error during resources termination")
+                }
+        }
+        return nil
+
+}
+
+func  instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
+        var wg sync.WaitGroup
+        var chans = make([]chan int, len(ressmap))
+        for l := range chans {
+                chans[l] = make(chan int)
+        }
+        for i:=0; i<len(ressmap); i++ {
+                wg.Add(1)
+                go func(index int) {
+                        if ressmap[index].depinfo == "go" {
+                                ressmap[index].status = "start"
+                        } else {
+                                ressmap[index].status = "waiting"
+                                c := <- chans[index]
+                                if c != index {
+                                       ressmap[index].status = "error"
+                                        ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
+                                       wg.Done()
+                                       return
+                                }
+                                ressmap[index].status = "start"
+                        }
+                        ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername)
+                        ressmap[index].status = "done"
+                        waitstr := fmt.Sprintf("wait on %v",ressmap[index].name)
+                        for j:=0; j<len(ressmap); j++ {
+                                if ressmap[j].depinfo == waitstr {
+                                        chans[j] <- j
+                                }
+                        }
+                        wg.Done()
+                }(i)
+        }
+        wg.Wait()
+        for k:=0; k<len(ressmap); k++ {
+                if ressmap[k].rerr != nil {
+                        return pkgerrors.Errorf("error during resources instantiation")
+                }
+        }
+        return nil
+
+}
+
+func terminateApp(ac appcontext.AppContext, appmap instMap) error {
+
+        for i:=0; i<len(appmap.clusters); i++ {
+               err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name,
+                               appmap.clusters[i].name)
+               if err != nil {
+                       return err
+               }
+       }
+       log.Println("Termination of app done: " + appmap.name)
+
+       return nil
+
+}
+
+
+func instantiateApp(ac appcontext.AppContext, appmap instMap) error {
+
+        for i:=0; i<len(appmap.clusters); i++ {
+               err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name,
+                               appmap.clusters[i].name)
+               if err != nil {
+                       return err
+               }
+       }
+       log.Println("Instantiation of app done: " + appmap.name)
+       return nil
+
+}
+
+func  instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
+       var wg sync.WaitGroup
+       var chans = make([]chan int, len(appsmap))
+       for l := range chans {
+               chans[l] = make(chan int)
+       }
+        for i:=0; i<len(appsmap); i++ {
+               wg.Add(1)
+                go func(index int) {
+                        if appsmap[index].depinfo == "go" {
+                               appsmap[index].status = "start"
+                       } else {
+                               appsmap[index].status = "waiting"
+                               c := <- chans[index]
+                               if c != index {
+                                       appsmap[index].status = "error"
+                                        appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
+                                       wg.Done()
+                                       return
+                               }
+                               appsmap[index].status = "start"
+                       }
+                       appsmap[index].rerr = instantiateApp(ac, appsmap[index])
+                       appsmap[index].status = "done"
+                       waitstr := fmt.Sprintf("wait on %v",appsmap[index].name)
+                       for j:=0; j<len(appsmap); j++ {
+                               if appsmap[j].depinfo == waitstr {
+                                       chans[j] <- j
+                               }
+                       }
+                       wg.Done()
+                }(i)
+        }
+       wg.Wait()
+       for k:=0; k<len(appsmap); k++ {
+               if appsmap[k].rerr != nil {
+                       return pkgerrors.Errorf("error during apps instantiation")
+               }
+       }
+       return nil
+
+}
+
+func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
+       ac := appcontext.AppContext{}
+
+       _, err := ac.LoadAppContext(cid)
+        if err != nil {
+                return err
+        }
+       instca.cid = cid
+
+       appsorder, err := ac.GetAppInstruction("order")
+        if err != nil {
+                return err
+        }
+       instca.appsorder = appsorder.(string)
+       appsdependency, err := ac.GetAppInstruction("dependency")
+        if err != nil {
+                return err
+        }
+       instca.appsdependency = appsdependency.(string)
+        instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app")
+        if err != nil {
+                return err
+        }
+
+       for j:=0; j<len(instca.appsmap); j++ {
+               clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
+               if err != nil {
+                       return err
+               }
+               instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
+               for k:=0; k<len(clusternames); k++ {
+                       instca.appsmap[j].clusters[k].name = clusternames[k]
+                       resorder, err := ac.GetResourceInstruction(
+                                       instca.appsmap[j].name, clusternames[k], "order")
+                       if err != nil {
+                               return err
+                       }
+                       instca.appsmap[j].clusters[k].resorder = resorder.(string)
+
+                       resdependency, err := ac.GetResourceInstruction(
+                                       instca.appsmap[j].name, clusternames[k], "dependency")
+                       if err != nil {
+                               return err
+                       }
+                       instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
+
+                       instca.appsmap[j].clusters[k].ressmap, err = getInstMap(
+                               instca.appsmap[j].clusters[k].resorder,
+                               instca.appsmap[j].clusters[k].resdependency, "res")
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+       err = instantiateApps(ac, instca.appsmap)
+        if err != nil {
+                return err
+        }
+
+       return nil
+}
+
+// Delete all the apps
+func  terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
+       var wg sync.WaitGroup
+       var chans = make([]chan int, len(appsmap))
+       for l := range chans {
+               chans[l] = make(chan int)
+       }
+        for i:=0; i<len(appsmap); i++ {
+               wg.Add(1)
+                go func(index int) {
+                        if appsmap[index].depinfo == "go" {
+                               appsmap[index].status = "start"
+                       } else {
+                               appsmap[index].status = "waiting"
+                               c := <- chans[index]
+                               if c != index {
+                                       appsmap[index].status = "error"
+                                        appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
+                                       wg.Done()
+                                       return
+                               }
+                               appsmap[index].status = "start"
+                       }
+                       appsmap[index].rerr = terminateApp(ac, appsmap[index])
+                       appsmap[index].status = "done"
+                       waitstr := fmt.Sprintf("wait on %v",appsmap[index].name)
+                       for j:=0; j<len(appsmap); j++ {
+                               if appsmap[j].depinfo == waitstr {
+                                       chans[j] <- j
+                               }
+                       }
+                       wg.Done()
+                }(i)
+        }
+       wg.Wait()
+       for k:=0; k<len(appsmap); k++ {
+               if appsmap[k].rerr != nil {
+                       return pkgerrors.Errorf("error during apps instantiation")
+               }
+       }
+       return nil
+
+}
+// Delete all the resources for a given context
+func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
+       ac := appcontext.AppContext{}
+
+       _, err := ac.LoadAppContext(cid)
+        if err != nil {
+                return err
+        }
+       instca.cid = cid
+
+       appsorder, err := ac.GetAppInstruction("order")
+        if err != nil {
+                return err
+        }
+       instca.appsorder = appsorder.(string)
+       appsdependency, err := ac.GetAppInstruction("dependency")
+        if err != nil {
+                return err
+        }
+       instca.appsdependency = appsdependency.(string)
+        instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app")
+        if err != nil {
+                return err
+        }
+
+       for j:=0; j<len(instca.appsmap); j++ {
+               clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
+               if err != nil {
+                       return err
+               }
+               instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
+               for k:=0; k<len(clusternames); k++ {
+                       instca.appsmap[j].clusters[k].name = clusternames[k]
+                       resorder, err := ac.GetResourceInstruction(
+                                       instca.appsmap[j].name, clusternames[k], "order")
+                       if err != nil {
+                               return err
+                       }
+                       instca.appsmap[j].clusters[k].resorder = resorder.(string)
+
+                       resdependency, err := ac.GetResourceInstruction(
+                                       instca.appsmap[j].name, clusternames[k], "dependency")
+                       if err != nil {
+                               return err
+                       }
+                       instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
+
+                       instca.appsmap[j].clusters[k].ressmap, err = getInstMap(
+                               instca.appsmap[j].clusters[k].resorder,
+                               instca.appsmap[j].clusters[k].resdependency, "res")
+                       if err != nil {
+                               return err
+                       }
+               }
+       }
+       err = terminateApps(ac, instca.appsmap)
+        if err != nil {
+                return err
+        }
+
+       return nil
+
+}
index 28b4a58..68118ad 100644 (file)
@@ -17,10 +17,8 @@ import (
        "context"
        "encoding/json"
        "log"
-
        "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp"
-       //"google.golang.org/grpc/codes"
-       //"google.golang.org/grpc/status"
+       con "github.com/onap/multicloud-k8s/src/rsync/pkg/context"
 )
 
 type installappServer struct {
@@ -31,10 +29,17 @@ func (cs *installappServer) InstallApp(ctx context.Context, req *installapp.Inst
        installAppReq, _ := json.Marshal(req)
        log.Println("GRPC Server received installAppRequest: ", string(installAppReq))
 
-       // Insert call to Server Functionality here
-       //
-       //
-
+       // Try instantiate the comp app 
+       instca := con.CompositeAppContext{}
+        err := instca.InstantiateComApp(req.GetAppContext())
+        if err != nil {
+                log.Println("Instantiation failed: " +  err.Error())
+               err := instca.TerminateComApp(req.GetAppContext())
+               if err != nil {
+                       log.Println("Termination failed: " + err.Error())
+               }
+               return &installapp.InstallAppResponse{AppContextInstalled: false}, err
+        }
        return &installapp.InstallAppResponse{AppContextInstalled: true}, nil
 }
 
@@ -43,8 +48,12 @@ func (cs *installappServer) UninstallApp(ctx context.Context, req *installapp.Un
        log.Println("GRPC Server received uninstallAppRequest: ", string(uninstallAppReq))
 
        // Try terminating the comp app here
-       //
-       //
+       instca := con.CompositeAppContext{}
+       err := instca.TerminateComApp(req.GetAppContext())
+       if err != nil {
+               log.Println("Termination failed: " + err.Error())
+               return &installapp.UninstallAppResponse{AppContextUninstalled: false}, err
+       }
 
        return &installapp.UninstallAppResponse{AppContextUninstalled: true}, nil
 }
diff --git a/src/rsync/pkg/internal/config/config.go b/src/rsync/pkg/internal/config/config.go
new file mode 100644 (file)
index 0000000..89f2553
--- /dev/null
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2019 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+       "encoding/json"
+       "log"
+       "os"
+       "reflect"
+)
+
+// Configuration loads up all the values that are used to configure
+// backend implementations
+type Configuration struct {
+       CAFile              string `json:"ca-file"`
+       ServerCert          string `json:"server-cert"`
+       ServerKey           string `json:"server-key"`
+       Password            string `json:"password"`
+       DatabaseAddress     string `json:"database-address"`
+       DatabaseType        string `json:"database-type"`
+       PluginDir           string `json:"plugin-dir"`
+       EtcdIP              string `json:"etcd-ip"`
+       EtcdCert            string `json:"etcd-cert"`
+       EtcdKey             string `json:"etcd-key"`
+       EtcdCAFile          string `json:"etcd-ca-file"`
+       ServicePort         string `json:"service-port"`
+       KubernetesLabelName string `json:"kubernetes-label-name"`
+}
+
+// Config is the structure that stores the configuration
+var gConfig *Configuration
+
+// readConfigFile reads the specified smsConfig file to setup some env variables
+func readConfigFile(file string) (*Configuration, error) {
+       f, err := os.Open(file)
+       if err != nil {
+               return defaultConfiguration(), err
+       }
+       defer f.Close()
+
+       // Setup some defaults here
+       // If the json file has values in it, the defaults will be overwritten
+       conf := defaultConfiguration()
+
+       // Read the configuration from json file
+       decoder := json.NewDecoder(f)
+       err = decoder.Decode(conf)
+       if err != nil {
+               return conf, err
+       }
+
+       return conf, nil
+}
+
+func defaultConfiguration() *Configuration {
+       cwd, err := os.Getwd()
+       if err != nil {
+               log.Println("Error getting cwd. Using .")
+               cwd = "."
+       }
+
+       return &Configuration{
+               CAFile:              "ca.cert",
+               ServerCert:          "server.cert",
+               ServerKey:           "server.key",
+               Password:            "",
+               DatabaseAddress:     "127.0.0.1",
+               DatabaseType:        "mongo",
+               PluginDir:           cwd,
+               EtcdIP:              "127.0.0.1",
+               EtcdCert:            "",
+               EtcdKey:             "",
+               EtcdCAFile:          "",
+               ServicePort:         "9015",
+               KubernetesLabelName: "k8splugin.io/rb-instance-id",
+       }
+}
+
+// GetConfiguration returns the configuration for the app.
+// It will try to load it if it is not already loaded.
+func GetConfiguration() *Configuration {
+       if gConfig == nil {
+               conf, err := readConfigFile("k8sconfig.json")
+               if err != nil {
+                       log.Println("Error loading config file. Using defaults.")
+               }
+               gConfig = conf
+       }
+
+       return gConfig
+}
+
+// SetConfigValue sets a value in the configuration
+// This is mostly used to customize the application and
+// should be used carefully.
+func SetConfigValue(key string, value string) *Configuration {
+       c := GetConfiguration()
+       if value == "" || key == "" {
+               return c
+       }
+
+       v := reflect.ValueOf(c).Elem()
+       if v.Kind() == reflect.Struct {
+               f := v.FieldByName(key)
+               if f.IsValid() {
+                       if f.CanSet() {
+                               if f.Kind() == reflect.String {
+                                       f.SetString(value)
+                               }
+                       }
+               }
+       }
+       return c
+}
diff --git a/src/rsync/pkg/internal/utils.go b/src/rsync/pkg/internal/utils.go
new file mode 100644 (file)
index 0000000..59ff6df
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+Copyright 2018 Intel Corporation.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+       "io/ioutil"
+       "os"
+       "path"
+
+       pkgerrors "github.com/pkg/errors"
+       "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/client-go/kubernetes/scheme"
+)
+
+// DecodeYAML reads a YAMl file to extract the Kubernetes object definition
+func DecodeYAML(path string, into runtime.Object) (runtime.Object, error) {
+       if _, err := os.Stat(path); err != nil {
+               if os.IsNotExist(err) {
+                       return nil, pkgerrors.New("File " + path + " not found")
+               } else {
+                       return nil, pkgerrors.Wrap(err, "Stat file error")
+               }
+       }
+
+       rawBytes, err := ioutil.ReadFile(path)
+       if err != nil {
+               return nil, pkgerrors.Wrap(err, "Read YAML file error")
+       }
+
+       decode := scheme.Codecs.UniversalDeserializer().Decode
+       obj, _, err := decode(rawBytes, nil, into)
+       if err != nil {
+               return nil, pkgerrors.Wrap(err, "Deserialize YAML error")
+       }
+
+       return obj, nil
+}
+
+//EnsureDirectory makes sure that the directories specified in the path exist
+//If not, it will create them, if possible.
+func EnsureDirectory(f string) error {
+       base := path.Dir(f)
+       _, err := os.Stat(base)
+       if err != nil && !os.IsNotExist(err) {
+               return err
+       }
+       return os.MkdirAll(base, 0755)
+}
diff --git a/src/rsync/pkg/resource/resource.go b/src/rsync/pkg/resource/resource.go
new file mode 100644 (file)
index 0000000..9d71569
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+Copyright 2018 Intel Corporation.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package resource
+
+import (
+       pkgerrors "github.com/pkg/errors"
+       "k8s.io/apimachinery/pkg/api/meta"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+       "k8s.io/apimachinery/pkg/runtime/schema"
+
+       utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
+       "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config"
+       "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+)
+
+type Resource struct {
+}
+
+// Create deployment object in a specific Kubernetes cluster
+func (r Resource) Create(yamlFilePath string, namespace string, client connector.KubernetesConnector) (string, error) {
+       if namespace == "" {
+               namespace = "default"
+       }
+
+       //Decode the yaml file to create a runtime.Object
+       unstruct := &unstructured.Unstructured{}
+       //Ignore the returned obj as we expect the data in unstruct
+       _, err := utils.DecodeYAML(yamlFilePath, unstruct)
+       if err != nil {
+               return "", pkgerrors.Wrap(err, "Decode deployment object error")
+       }
+
+       dynClient := client.GetDynamicClient()
+       mapper := client.GetMapper()
+
+       gvk := unstruct.GroupVersionKind()
+       mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
+       if err != nil {
+               return "", pkgerrors.Wrap(err, "Mapping kind to resource error")
+       }
+
+       //Add the tracking label to all resources created here
+       labels := unstruct.GetLabels()
+       //Check if labels exist for this object
+       if labels == nil {
+               labels = map[string]string{}
+       }
+       labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+       unstruct.SetLabels(labels)
+
+       // This checks if the resource we are creating has a podSpec in it
+       // Eg: Deployment, StatefulSet, Job etc..
+       // If a PodSpec is found, the label will be added to it too.
+       connector.TagPodsIfPresent(unstruct, client.GetInstanceID())
+
+       gvr := mapping.Resource
+       var createdObj *unstructured.Unstructured
+
+       switch mapping.Scope.Name() {
+       case meta.RESTScopeNameNamespace:
+               createdObj, err = dynClient.Resource(gvr).Namespace(namespace).Create(unstruct, metav1.CreateOptions{})
+       case meta.RESTScopeNameRoot:
+               createdObj, err = dynClient.Resource(gvr).Create(unstruct, metav1.CreateOptions{})
+       default:
+               return "", pkgerrors.New("Got an unknown RESTSCopeName for mapping: " + gvk.String())
+       }
+
+       if err != nil {
+               return "", pkgerrors.Wrap(err, "Create object error")
+       }
+
+       return createdObj.GetName(), nil
+}
+
+// Delete an existing resource hosted in a specific Kubernetes cluster
+func (r Resource) Delete(yamlFilePath string, resname string, namespace string, client connector.KubernetesConnector) error {
+        if namespace == "" {
+                namespace = "default"
+        }
+
+        //Decode the yaml file to create a runtime.Object
+        unstruct := &unstructured.Unstructured{}
+        //Ignore the returned obj as we expect the data in unstruct
+        _, err := utils.DecodeYAML(yamlFilePath, unstruct)
+        if err != nil {
+                return pkgerrors.Wrap(err, "Decode deployment object error")
+        }
+
+        dynClient := client.GetDynamicClient()
+        mapper := client.GetMapper()
+
+        gvk := unstruct.GroupVersionKind()
+        mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
+        if err != nil {
+                return pkgerrors.Wrap(err, "Mapping kind to resource error")
+        }
+
+        gvr := mapping.Resource
+        deletePolicy := metav1.DeletePropagationForeground
+        opts := &metav1.DeleteOptions{
+                PropagationPolicy: &deletePolicy,
+        }
+
+        switch mapping.Scope.Name() {
+        case meta.RESTScopeNameNamespace:
+                err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts)
+        case meta.RESTScopeNameRoot:
+                err = dynClient.Resource(gvr).Delete(resname, opts)
+        default:
+                return pkgerrors.New("Got an unknown RESTSCopeName for mappin")
+        }
+
+        if err != nil {
+                return pkgerrors.Wrap(err, "Delete object error")
+        }
+        return nil
+}