From: Ritu Sood Date: Thu, 23 Jul 2020 22:56:20 +0000 (-0700) Subject: Rsync change behaviour on error handling X-Git-Tag: 0.7.0~64^2 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=e7061c31f693f0ee60040a67baaa3935c64786cb;p=multicloud%2Fk8s.git Rsync change behaviour on error handling If error in any resource stop processing and end all goroutines. Also return gRpc call after starting the goroutine. Adds retry checks also Issue-ID: MULTICLOUD-1005 Signed-off-by: Ritu Sood Change-Id: I1189e02f0c0426181fdc995a0c4816ceaa64ec7d --- diff --git a/src/rsync/go.mod b/src/rsync/go.mod index 18fef6be..2072510a 100644 --- a/src/rsync/go.mod +++ b/src/rsync/go.mod @@ -5,6 +5,7 @@ go 1.13 require ( //client github.com/evanphx/json-patch v4.5.0+incompatible // indirect + github.com/ghodss/yaml v1.0.0 github.com/golang/protobuf v1.4.1 github.com/googleapis/gnostic v0.4.0 github.com/jonboulle/clockwork v0.1.0 @@ -29,8 +30,8 @@ require ( replace ( github.com/onap/multicloud-k8s/src/clm => ../clm - github.com/onap/multicloud-k8s/src/orchestrator => ../orchestrator github.com/onap/multicloud-k8s/src/monitor => ../monitor + github.com/onap/multicloud-k8s/src/orchestrator => ../orchestrator k8s.io/api => k8s.io/api v0.17.3 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.17.3 k8s.io/apimachinery => k8s.io/apimachinery v0.17.3 diff --git a/src/rsync/pkg/context/context.go b/src/rsync/pkg/context/context.go index cc7773b8..2fadfd00 100644 --- a/src/rsync/pkg/context/context.go +++ b/src/rsync/pkg/context/context.go @@ -22,6 +22,7 @@ import ( "fmt" "log" "strings" + "time" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils" @@ -129,6 +130,25 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st return nil } +// Wait for 2 secs +const waitTime = 2 + +func waitForClusterReady(c *kubeclient.Client, cluster string) error { + for { + if err := c.IsReachable(); err != nil { + // TODO: Add more realistic error checking + // TODO: Add Incremental wait logic here + time.Sleep(waitTime * time.Second) + } else { + break + } + } + logutils.Info("Cluster is reachable::", logutils.Fields{ + "cluster": cluster, + }) + return nil +} + func addStatusTracker(c *kubeclient.Client, app string, cluster string, label string) error { b, err := status.GetStatusCR(label) @@ -139,6 +159,7 @@ func addStatusTracker(c *kubeclient.Client, app string, cluster string, label st }) return err } + // TODO: Check reachability? if err = c.Apply(b); err != nil { logutils.Error("Failed to apply status tracker", logutils.Fields{ "error": err, @@ -185,9 +206,12 @@ type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, ap type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error -func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn, breakonError bool) error { +func applyFnComApp(cid interface{}, f fn, sfn statusfn, breakonError bool) error { + + con := connector.Init(cid) + //Cleanup + defer con.RemoveClient() ac := appcontext.AppContext{} - g, _ := errgroup.WithContext(context.Background()) _, err := ac.LoadAppContext(cid) if err != nil { return err @@ -203,9 +227,9 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn "string": appList, }) id, _ := ac.GetCompositeAppHandle() - + g, _ := errgroup.WithContext(context.Background()) + // Iterate over all the subapps for _, app := range appList["apporder"] { - appName := app results := strings.Split(id.(string), "/") label := results[2] + "-" + app @@ -214,14 +238,14 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn if err != nil { return err } - rg, _ := errgroup.WithContext(context.Background()) + // Iterate over all clusters for k := 0; k < len(clusterNames); k++ { cluster := clusterNames[k] err = status.StartClusterWatcher(cluster) if err != nil { log.Printf("Error starting Cluster Watcher %v: %v\n", cluster, err) } - rg.Go(func() error { + g.Go(func() error { c, err := con.GetClient(cluster) if err != nil { logutils.Error("Error in creating kubeconfig client", logutils.Fields{ @@ -238,42 +262,58 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn } var aov map[string][]string json.Unmarshal([]byte(resorder.(string)), &aov) - for i, res := range aov["resorder"] { - err = f(ac, c, res, appName, cluster, label) + // Keep retrying for reachability + for { + // Wait for cluster to be reachable + err = waitForClusterReady(c, cluster) if err != nil { - logutils.Error("Error in resource %s: %v", logutils.Fields{ - "error": err, - "cluster": cluster, - "resource": res, - }) - if breakonError { - // handle status tracking before exiting if at least one resource got handled - if i > 0 { - serr := sfn(c, appName, cluster, label) - if serr != nil { - logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + // TODO: Add error handling + return err + } + reachable := true + // Handle all resources in order + for i, res := range aov["resorder"] { + err = f(ac, c, res, appName, cluster, label) + if err != nil { + logutils.Error("Error in resource %s: %v", logutils.Fields{ + "error": err, + "cluster": cluster, + "resource": res, + }) + // If failure is due to reachability issues start retrying + if err = c.IsReachable(); err != nil { + reachable = false + break + } + if breakonError { + // handle status tracking before exiting if at least one resource got handled + if i > 0 { + serr := sfn(c, appName, cluster, label) + if serr != nil { + logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + } } + return err } - return err } } - } - serr := sfn(c, appName, cluster, label) - if serr != nil { - logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + // Check if the break from loop due to reachabilty issues + if reachable != false { + serr := sfn(c, appName, cluster, label) + if serr != nil { + logutils.Warn("Error handling status tracker", logutils.Fields{"error": serr}) + } + // Done processing cluster without errors + return nil + } } return nil }) } - if err := rg.Wait(); err != nil { - logutils.Error("Encountered error in App cluster", logutils.Fields{ - "error": err, - }) - return err - } return nil }) } + // Wait for all subtasks to complete if err := g.Wait(); err != nil { logutils.Error("Encountered error", logutils.Fields{ "error": err, @@ -285,28 +325,14 @@ func applyFnComApp(cid interface{}, con *connector.Connector, f fn, sfn statusfn // InstantiateComApp Instantiate Apps in Composite App func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error { - con := connector.Init(cid) - err := applyFnComApp(cid, con, instantiateResource, addStatusTracker, true) - if err != nil { - logutils.Error("InstantiateComApp unsuccessful", logutils.Fields{"error": err}) - return err - } - //Cleanup - con.RemoveClient() + // Start handling and return grpc immediately + go applyFnComApp(cid, instantiateResource, addStatusTracker, true) return nil } // TerminateComApp Terminates Apps in Composite App func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error { - con := connector.Init(cid) - err := applyFnComApp(cid, con, terminateResource, deleteStatusTracker, false) - if err != nil { - logutils.Error("TerminateComApp unsuccessful", logutils.Fields{ - "error": err, - }) - return err - } - //Cleanup - con.RemoveClient() + // Start handling and return grpc immediately + go applyFnComApp(cid, terminateResource, deleteStatusTracker, true) return nil }