Rsync change behaviour on error handling 35/110535/2
authorRitu Sood <ritu.sood@intel.com>
Thu, 23 Jul 2020 22:56:20 +0000 (15:56 -0700)
committerEric Multanen <eric.w.multanen@intel.com>
Fri, 7 Aug 2020 19:02:19 +0000 (12:02 -0700)
If error in any resource stop processing
and end all goroutines. Also return gRpc
call after starting the goroutine. Adds
retry checks also

Issue-ID: MULTICLOUD-1005
Signed-off-by: Ritu Sood <ritu.sood@intel.com>
Change-Id: I1189e02f0c0426181fdc995a0c4816ceaa64ec7d

src/rsync/go.mod
src/rsync/pkg/context/context.go

index 18fef6b..2072510 100644 (file)
@@ -5,6 +5,7 @@ go 1.13
 require (
        //client
        github.com/evanphx/json-patch v4.5.0+incompatible // indirect
+       github.com/ghodss/yaml v1.0.0
        github.com/golang/protobuf v1.4.1
        github.com/googleapis/gnostic v0.4.0
        github.com/jonboulle/clockwork v0.1.0
@@ -29,8 +30,8 @@ require (
 
 replace (
        github.com/onap/multicloud-k8s/src/clm => ../clm
-       github.com/onap/multicloud-k8s/src/orchestrator => ../orchestrator
        github.com/onap/multicloud-k8s/src/monitor => ../monitor
+       github.com/onap/multicloud-k8s/src/orchestrator => ../orchestrator
        k8s.io/api => k8s.io/api v0.17.3
        k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.17.3
        k8s.io/apimachinery => k8s.io/apimachinery v0.17.3
index cc7773b..2fadfd0 100644 (file)
@@ -22,6 +22,7 @@ import (
        "fmt"
        "log"
        "strings"
+       "time"
 
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
@@ -129,6 +130,25 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st
        return nil
 }
 
+// Wait for 2 secs
+const waitTime = 2
+
+func waitForClusterReady(c *kubeclient.Client, cluster string) error {
+       for {
+               if err := c.IsReachable(); err != nil {
+                       // TODO: Add more realistic error checking
+                       // TODO: Add Incremental wait logic here
+                       time.Sleep(waitTime * time.Second)
+               } else {
+                       break
+               }
+       }
+       logutils.Info("Cluster is reachable::", logutils.Fields{
+               "cluster": cluster,
+       })
+       return nil
+}
+
 func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error {
 
        b, err := status.GetStatusCR(label)
@@ -139,6 +159,7 @@ func addStatusTracker(c *kubeclient.Client, app string, cluster string, label st
                })
                return err
        }
+       // TODO: Check reachability?
        if err = c.Apply(b); err != nil {
                logutils.Error("Failed to apply status tracker", logutils.Fields{
                        "error":   err,
@@ -185,9 +206,12 @@ type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, ap
 
 type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error
 
-func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn, breakonError bool) error {
+func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error {
+
+       con := connector.Init(cid)
+       //Cleanup
+       defer con.RemoveClient()
        ac := appcontext.AppContext{}
-       g, _ := errgroup.WithContext(context.Background())
        _, err := ac.LoadAppContext(cid)
        if err != nil {
                return err
@@ -203,9 +227,9 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn
                "string":    appList,
        })
        id, _ := ac.GetCompositeAppHandle()
-
+       g, _ := errgroup.WithContext(context.Background())
+       // Iterate over all the subapps
        for _, app := range appList["apporder"] {
-
                appName := app
                results := strings.Split(id.(string), "/")
                label := results[2] + "-" + app
@@ -214,14 +238,14 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn
                        if err != nil {
                                return err
                        }
-                       rg, _ := errgroup.WithContext(context.Background())
+                       // Iterate over all clusters
                        for k := 0; k < len(clusterNames); k++ {
                                cluster := clusterNames[k]
                                err = status.StartClusterWatcher(cluster)
                                if err != nil {
                                        log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err)
                                }
-                               rg.Go(func() error {
+                               g.Go(func() error {
                                        c, err := con.GetClient(cluster)
                                        if err != nil {
                                                logutils.Error("Error in creating kubeconfig client", logutils.Fields{
@@ -238,42 +262,58 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn
                                        }
                                        var aov map[string][]string
                                        json.Unmarshal([]byte(resorder.(string)), &aov)
-                                       for i, res := range aov["resorder"] {
-                                               err = f(ac, c, res, appName, cluster, label)
+                                       // Keep retrying for reachability
+                                       for {
+                                               // Wait for cluster to be reachable
+                                               err = waitForClusterReady(c, cluster)
                                                if err != nil {
-                                                       logutils.Error("Error in resource %s: %v", logutils.Fields{
-                                                               "error":    err,
-                                                               "cluster":  cluster,
-                                                               "resource": res,
-                                                       })
-                                                       if breakonError {
-                                                               // handle status tracking before exiting if at least one resource got handled
-                                                               if i > 0 {
-                                                                       serr := sfn(c, appName, cluster, label)
-                                                                       if serr != nil {
-                                                                               logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+                                                       // TODO: Add error handling
+                                                       return err
+                                               }
+                                               reachable := true
+                                               // Handle all resources in order
+                                               for i, res := range aov["resorder"] {
+                                                       err = f(ac, c, res, appName, cluster, label)
+                                                       if err != nil {
+                                                               logutils.Error("Error in resource %s: %v", logutils.Fields{
+                                                                       "error":    err,
+                                                                       "cluster":  cluster,
+                                                                       "resource": res,
+                                                               })
+                                                               // If failure is due to reachability issues start retrying
+                                                               if err = c.IsReachable(); err != nil {
+                                                                       reachable = false
+                                                                       break
+                                                               }
+                                                               if breakonError {
+                                                                       // handle status tracking before exiting if at least one resource got handled
+                                                                       if i > 0 {
+                                                                               serr := sfn(c, appName, cluster, label)
+                                                                               if serr != nil {
+                                                                                       logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+                                                                               }
                                                                        }
+                                                                       return err
                                                                }
-                                                               return err
                                                        }
                                                }
-                                       }
-                                       serr := sfn(c, appName, cluster, label)
-                                       if serr != nil {
-                                               logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+                                               // Check if the break from loop due to reachabilty issues
+                                               if reachable != false {
+                                                       serr := sfn(c, appName, cluster, label)
+                                                       if serr != nil {
+                                                               logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr})
+                                                       }
+                                                       // Done processing cluster without errors
+                                                       return nil
+                                               }
                                        }
                                        return nil
                                })
                        }
-                       if err := rg.Wait(); err != nil {
-                               logutils.Error("Encountered error in App cluster", logutils.Fields{
-                                       "error": err,
-                               })
-                               return err
-                       }
                        return nil
                })
        }
+       // Wait for all subtasks to complete
        if err := g.Wait(); err != nil {
                logutils.Error("Encountered error", logutils.Fields{
                        "error": err,
@@ -285,28 +325,14 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn
 
 // InstantiateComApp Instantiate Apps in Composite App
 func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
-       con := connector.Init(cid)
-       err := applyFnComApp(cid, con, instantiateResource, addStatusTracker, true)
-       if err != nil {
-               logutils.Error("InstantiateComApp unsuccessful", logutils.Fields{"error": err})
-               return err
-       }
-       //Cleanup
-       con.RemoveClient()
+       // Start handling and return grpc immediately
+       go applyFnComApp(cid, instantiateResource, addStatusTracker, true)
        return nil
 }
 
 // TerminateComApp Terminates Apps in Composite App
 func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
-       con := connector.Init(cid)
-       err := applyFnComApp(cid, con, terminateResource, deleteStatusTracker, false)
-       if err != nil {
-               logutils.Error("TerminateComApp unsuccessful", logutils.Fields{
-                       "error": err,
-               })
-               return err
-       }
-       //Cleanup
-       con.RemoveClient()
+       // Start handling and return grpc immediately
+       go applyFnComApp(cid, terminateResource, deleteStatusTracker, true)
        return nil
 }