Changes to add state and retry logic to rsync 52/111052/25
authorManjunath Ranganathaiah <manjunath.ranganathaiah@intel.com>
Fri, 7 Aug 2020 19:06:22 +0000 (19:06 +0000)
committerEric Multanen <eric.w.multanen@intel.com>
Thu, 10 Sep 2020 04:42:13 +0000 (21:42 -0700)
- Adds retry watcher and related functionality.
- Adds code to update, get the status from appcontext.
- Adds logic to handle state transition during terminate.

Issue-ID: MULTICLOUD-1005
Signed-off-by: Manjunath Ranganathaiah <manjunath.ranganathaiah@intel.com>
Change-Id: I2ed76efd9d8b6f40fec547bbe8b7d8a86f69ce07

src/clm/pkg/cluster/cluster.go
src/orchestrator/pkg/appcontext/appcontext.go
src/orchestrator/pkg/module/deployment_intent_groups.go
src/orchestrator/pkg/state/state_helper.go
src/rsync/go.mod
src/rsync/go.sum
src/rsync/pkg/client/client.go
src/rsync/pkg/context/context.go

index 26a9d6d..fb8768d 100644 (file)
@@ -19,6 +19,7 @@ package cluster
 import (
        "time"
 
+       "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db"
        mtypes "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module/types"
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/state"
@@ -423,6 +424,14 @@ func (v *ClusterClient) DeleteCluster(provider, name string) error {
 
        // remove the app contexts associated with this cluster
        if stateVal == state.StateEnum.Terminated {
+               // Verify that the appcontext has completed terminating
+               ctxid := state.GetLastContextIdFromStateInfo(s)
+               acStatus, err := state.GetAppContextStatus(ctxid)
+               if err == nil &&
+                       !(acStatus.Status == appcontext.AppContextStatusEnum.Terminated || acStatus.Status == appcontext.AppContextStatusEnum.TerminateFailed) {
+                       return pkgerrors.Errorf("Network intents for cluster have not completed terminating " + name)
+               }
+
                for _, id := range state.GetContextIdsFromStateInfo(s) {
                        context, err := state.GetAppContextFromId(id)
                        if err != nil {
index db2ba43..5d75794 100644 (file)
@@ -37,30 +37,30 @@ type AppContext struct {
 // AppContextStatus represents the current status of the appcontext
 //     Instantiating - instantiate has been invoked and is still in progress
 //     Instantiated - instantiate has completed
-//     PreTerminate - terminate has been invoked when in Instantiating status - need to clean up first
 //     Terminating - terminate has been invoked and is still in progress
 //     Terminated - terminate has completed
-//     Failed - the instantiate or terminate action has failed
+//     InstantiateFailed - the instantiate action has failed
+//     TerminateFailed - the terminate action has failed
 type AppContextStatus struct {
        Status StatusValue
 }
 type StatusValue string
 type statuses struct {
-       Instantiating StatusValue
-       Instantiated  StatusValue
-       PreTerminate  StatusValue
-       Terminating   StatusValue
-       Terminated    StatusValue
-       Failed        StatusValue
+       Instantiating     StatusValue
+       Instantiated      StatusValue
+       Terminating       StatusValue
+       Terminated        StatusValue
+       InstantiateFailed StatusValue
+       TerminateFailed   StatusValue
 }
 
 var AppContextStatusEnum = &statuses{
-       Instantiating: "Instantiating",
-       Instantiated:  "Instantiated",
-       PreTerminate:  "PreTerminate",
-       Terminating:   "Terminating",
-       Terminated:    "Terminated",
-       Failed:        "Failed",
+       Instantiating:     "Instantiating",
+       Instantiated:      "Instantiated",
+       Terminating:       "Terminating",
+       Terminated:        "Terminated",
+       InstantiateFailed: "InstantiateFailed",
+       TerminateFailed:   "TerminateFailed",
 }
 
 // CompositeAppMeta consists of projectName, CompositeAppName,
index f982985..dec6391 100644 (file)
@@ -21,6 +21,7 @@ import (
        "reflect"
        "time"
 
+       "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db"
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/state"
 
@@ -271,6 +272,14 @@ func (c *DeploymentIntentGroupClient) DeleteDeploymentIntentGroup(di string, p s
 
        // remove the app contexts associated with thie Deployment Intent Group
        if stateVal == state.StateEnum.Terminated {
+               // Verify that the appcontext has completed terminating
+               ctxid := state.GetLastContextIdFromStateInfo(s)
+               acStatus, err := state.GetAppContextStatus(ctxid)
+               if err == nil &&
+                       !(acStatus.Status == appcontext.AppContextStatusEnum.Terminated || acStatus.Status == appcontext.AppContextStatusEnum.TerminateFailed) {
+                       return pkgerrors.Errorf("DeploymentIntentGroup has not completed terminating: " + di)
+               }
+
                for _, id := range state.GetContextIdsFromStateInfo(s) {
                        context, err := state.GetAppContextFromId(id)
                        if err != nil {
index 9d59fb7..1f926f8 100644 (file)
@@ -17,6 +17,8 @@
 package state
 
 import (
+       "encoding/json"
+
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
        pkgerrors "github.com/pkg/errors"
 )
@@ -69,3 +71,30 @@ func GetContextIdsFromStateInfo(s StateInfo) []string {
 
        return ids
 }
+
+func GetAppContextStatus(ctxid string) (appcontext.AppContextStatus, error) {
+
+       ac, err := GetAppContextFromId(ctxid)
+       if err != nil {
+               return appcontext.AppContextStatus{}, err
+       }
+
+       h, err := ac.GetCompositeAppHandle()
+       if err != nil {
+               return appcontext.AppContextStatus{}, err
+       }
+       sh, err := ac.GetLevelHandle(h, "status")
+       if err != nil {
+               return appcontext.AppContextStatus{}, err
+       }
+       s, err := ac.GetValue(sh)
+       if err != nil {
+               return appcontext.AppContextStatus{}, err
+       }
+       acStatus := appcontext.AppContextStatus{}
+       js, _ := json.Marshal(s)
+       json.Unmarshal(js, &acStatus)
+
+       return acStatus, nil
+
+}
index 0fd2c78..b5f5c93 100644 (file)
@@ -27,6 +27,7 @@ require (
 replace (
        github.com/onap/multicloud-k8s/src/clm => ../clm
        github.com/onap/multicloud-k8s/src/monitor => ../monitor
+       github.com/onap/multicloud-k8s/src/orchestrator => ../orchestrator
        k8s.io/api => k8s.io/api v0.17.3
        k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.17.3
        k8s.io/apimachinery => k8s.io/apimachinery v0.17.3
index 0063789..95895d0 100644 (file)
@@ -1216,10 +1216,14 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV
 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
 github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
 github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
+github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
 github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
+github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
 github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
 github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
 github.com/xeipuuv/gojsonschema v1.1.0/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs=
+github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
+github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
 github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
 github.com/xlab/handysort v0.0.0-20150421192137-fb3537ed64a1/go.mod h1:QcJo0QPSfTONNIgpN5RA8prR7fF8nkF6cTWTcNerRO8=
index a489b95..5920dea 100644 (file)
@@ -25,7 +25,7 @@ import (
 
 // DefaultValidation default action to validate. If `true` all resources by
 // default will be validated.
-const DefaultValidation = true
+const DefaultValidation = false
 
 // Client is a kubernetes client, like `kubectl`
 type Client struct {
index f77482e..4b886ec 100644 (file)
@@ -21,6 +21,7 @@ import (
        "encoding/json"
        "fmt"
        "strings"
+       "sync"
        "time"
 
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
@@ -36,7 +37,9 @@ import (
 )
 
 type CompositeAppContext struct {
-       cid interface{}
+       cid   interface{}
+       chans []chan bool
+       mutex sync.Mutex
 }
 
 func getRes(ac appcontext.AppContext, name string, app string, cluster string) ([]byte, interface{}, error) {
@@ -144,26 +147,150 @@ func instantiateResource(ac appcontext.AppContext, c *kubeclient.Client, name st
        return nil
 }
 
+func updateResourceStatus(ac appcontext.AppContext, resState resourcestatus.ResourceStatus, app string, cluster string, aov map[string][]string) error {
+
+       for _, res := range aov["resorder"] {
+
+               rh, err := ac.GetResourceHandle(app, cluster, res)
+               if err != nil {
+                       return err
+               }
+               sh, err := ac.GetLevelHandle(rh, "status")
+               if err != nil {
+                       return err
+               }
+
+               s, err := ac.GetValue(sh)
+               if err != nil {
+                       return err
+               }
+               rStatus := resourcestatus.ResourceStatus{}
+               js, err := json.Marshal(s)
+               if err != nil {
+                       return err
+               }
+               err = json.Unmarshal(js, &rStatus)
+               if err != nil {
+                       return err
+               }
+               // no need to update a status that has reached a 'done' status
+               if rStatus.Status == resourcestatus.RsyncStatusEnum.Deleted ||
+                       rStatus.Status == resourcestatus.RsyncStatusEnum.Applied ||
+                       rStatus.Status == resourcestatus.RsyncStatusEnum.Failed {
+                       continue
+               }
+
+               err = ac.UpdateStatusValue(sh, resState)
+               if err != nil {
+                       return err
+               }
+       }
+
+       return nil
+
+}
+
+// return true if all resources have reached a 'done' status - e.g. Applied, Deleted or Failed
+func allResourcesDone(ac appcontext.AppContext, app string, cluster string, aov map[string][]string) bool {
+
+       for _, res := range aov["resorder"] {
+
+               rh, err := ac.GetResourceHandle(app, cluster, res)
+               if err != nil {
+                       return false
+               }
+               sh, err := ac.GetLevelHandle(rh, "status")
+               if err != nil {
+                       return false
+               }
+
+               s, err := ac.GetValue(sh)
+               if err != nil {
+                       return false
+               }
+               rStatus := resourcestatus.ResourceStatus{}
+               js, err := json.Marshal(s)
+               if err != nil {
+                       return false
+               }
+               err = json.Unmarshal(js, &rStatus)
+               if err != nil {
+                       return false
+               }
+               if rStatus.Status != resourcestatus.RsyncStatusEnum.Deleted &&
+                       rStatus.Status != resourcestatus.RsyncStatusEnum.Applied &&
+                       rStatus.Status != resourcestatus.RsyncStatusEnum.Failed {
+                       return false
+               }
+       }
+
+       return true
+
+}
+
 // Wait for 2 secs
 const waitTime = 2
 
-func waitForClusterReady(c *kubeclient.Client, cluster string) error {
+func waitForClusterReady(instca *CompositeAppContext, ac appcontext.AppContext, c *kubeclient.Client, appname string, cluster string, aov map[string][]string) error {
+
+       forceDone := false
+       resStateUpdated := false
+       ch := addChan(instca)
+
+       rch := make(chan error, 1)
+       checkReachable := func() {
+               err := c.IsReachable()
+               rch <- err
+       }
+
+       go checkReachable()
+Loop:
        for {
-               if err := c.IsReachable(); err != nil {
-                       // TODO: Add more realistic error checking
-                       // TODO: Add Incremental wait logic here
-                       time.Sleep(waitTime * time.Second)
-               } else {
+               select {
+               case rerr := <-rch:
+                       if rerr == nil {
+                               break Loop
+                       } else {
+                               logutils.Info("Cluster is not reachable - keep trying::", logutils.Fields{"cluster": cluster})
+                               go checkReachable()
+                       }
+               case <-ch:
+                       statusFailed := resourcestatus.ResourceStatus{
+                               Status: resourcestatus.RsyncStatusEnum.Failed,
+                       }
+                       err := updateResourceStatus(ac, statusFailed, appname, cluster, aov)
+                       if err != nil {
+                               deleteChan(instca, ch)
+                               return err
+                       }
+                       forceDone = true
+                       break Loop
+               case <-time.After(waitTime * time.Second):
+                       // on first timeout - cluster is apparently not reachable, update resources in
+                       // this group to 'Retrying'
+                       if !resStateUpdated {
+                               statusRetrying := resourcestatus.ResourceStatus{
+                                       Status: resourcestatus.RsyncStatusEnum.Retrying,
+                               }
+                               err := updateResourceStatus(ac, statusRetrying, appname, cluster, aov)
+                               if err != nil {
+                                       deleteChan(instca, ch)
+                                       return err
+                               }
+                               resStateUpdated = true
+                       }
                        break
                }
        }
-       logutils.Info("Cluster is reachable::", logutils.Fields{
-               "cluster": cluster,
-       })
+
+       deleteChan(instca, ch)
+       if forceDone {
+               return pkgerrors.Errorf("Termination of rsync cluster retry: " + cluster)
+       }
        return nil
 }
 
-// initializeResourceStatus sets the initial status of every resource appropriately based on the state of the AppContext
+// initializeAppContextStatus sets the initial status of every resource appropriately based on the state of the AppContext
 func initializeAppContextStatus(ac appcontext.AppContext, acStatus appcontext.AppContextStatus) error {
        h, err := ac.GetCompositeAppHandle()
        if err != nil {
@@ -320,12 +447,18 @@ func updateEndingAppContextStatus(ac appcontext.AppContext, handle interface{},
        js, _ := json.Marshal(s)
        json.Unmarshal(js, &acStatus)
 
-       if failure {
-               acStatus.Status = appcontext.AppContextStatusEnum.Failed
-       } else if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating {
-               acStatus.Status = appcontext.AppContextStatusEnum.Instantiated
+       if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating {
+               if failure {
+                       acStatus.Status = appcontext.AppContextStatusEnum.InstantiateFailed
+               } else {
+                       acStatus.Status = appcontext.AppContextStatusEnum.Instantiated
+               }
        } else if acStatus.Status == appcontext.AppContextStatusEnum.Terminating {
-               acStatus.Status = appcontext.AppContextStatusEnum.Terminated
+               if failure {
+                       acStatus.Status = appcontext.AppContextStatusEnum.TerminateFailed
+               } else {
+                       acStatus.Status = appcontext.AppContextStatusEnum.Terminated
+               }
        } else {
                return pkgerrors.Errorf("Invalid AppContextStatus %v", acStatus)
        }
@@ -337,20 +470,197 @@ func updateEndingAppContextStatus(ac appcontext.AppContext, handle interface{},
        return nil
 }
 
+func getAppContextStatus(ac appcontext.AppContext) (*appcontext.AppContextStatus, error) {
+
+       h, err := ac.GetCompositeAppHandle()
+       if err != nil {
+               return nil, err
+       }
+       sh, err := ac.GetLevelHandle(h, "status")
+       if err != nil {
+               return nil, err
+       }
+       s, err := ac.GetValue(sh)
+       if err != nil {
+               return nil, err
+       }
+       acStatus := appcontext.AppContextStatus{}
+       js, _ := json.Marshal(s)
+       json.Unmarshal(js, &acStatus)
+
+       return &acStatus, nil
+
+}
+
 type fn func(ac appcontext.AppContext, client *kubeclient.Client, res string, app string, cluster string, label string) error
 
 type statusfn func(client *kubeclient.Client, app string, cluster string, label string) error
 
-func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn, sfn statusfn, breakonError bool) error {
-       con := connector.Init(cid)
+func addChan(instca *CompositeAppContext) chan bool {
+
+       instca.mutex.Lock()
+       c := make(chan bool)
+       instca.chans = append(instca.chans, c)
+       instca.mutex.Unlock()
+
+       return c
+}
+
+func deleteChan(instca *CompositeAppContext, c chan bool) error {
+
+       var i int
+       instca.mutex.Lock()
+       for i = 0; i < len(instca.chans); i++ {
+               if instca.chans[i] == c {
+                       break
+               }
+       }
+
+       if i == len(instca.chans) {
+               instca.mutex.Unlock()
+               return pkgerrors.Errorf("Given channel was not found:")
+       }
+       instca.chans[i] = instca.chans[len(instca.chans)-1]
+       instca.chans = instca.chans[:len(instca.chans)-1]
+       instca.mutex.Unlock()
+
+       return nil
+}
+
+func waitForDone(ac appcontext.AppContext) {
+       count := 0
+       for {
+               time.Sleep(1 * time.Second)
+               count++
+               if count == 60*60 {
+                       logutils.Info("Wait for done watcher running..", logutils.Fields{})
+                       count = 0
+               }
+               acStatus, _ := getAppContextStatus(ac)
+               if acStatus.Status == appcontext.AppContextStatusEnum.Instantiated ||
+                       acStatus.Status == appcontext.AppContextStatusEnum.InstantiateFailed {
+                       return
+               }
+       }
+       return
+}
+
+func kickoffRetryWatcher(instca *CompositeAppContext, ac appcontext.AppContext, acStatus appcontext.AppContextStatus, wg *errgroup.Group) {
+
+       wg.Go(func() error {
+
+               var count int
+
+               count = 0
+               for {
+                       time.Sleep(1 * time.Second)
+                       count++
+                       if count == 60*60 {
+                               logutils.Info("Retry watcher running..", logutils.Fields{})
+                               count = 0
+                       }
+
+                       cStatus, err := getAppContextStatus(ac)
+                       if err != nil {
+                               logutils.Error("Failed to get the app context status", logutils.Fields{
+                                       "error": err,
+                               })
+                               return err
+                       }
+                       flag, err := getAppContextFlag(ac)
+                       if err != nil {
+                               logutils.Error("Failed to get the stop flag", logutils.Fields{
+                                       "error": err,
+                               })
+                               return err
+                       } else {
+                               if flag == true {
+                                       instca.mutex.Lock()
+                                       for i := 0; i < len(instca.chans); i++ {
+                                               instca.chans[i] <- true
+                                               logutils.Info("kickoffRetryWatcher - send an exit message", logutils.Fields{})
+                                       }
+                                       instca.mutex.Unlock()
+                                       break
+                               }
+                       }
+                       if acStatus.Status == appcontext.AppContextStatusEnum.Instantiating {
+                               if cStatus.Status == appcontext.AppContextStatusEnum.Instantiated ||
+                                       cStatus.Status == appcontext.AppContextStatusEnum.InstantiateFailed {
+                                       break
+                               }
+                       } else {
+                               if cStatus.Status == appcontext.AppContextStatusEnum.Terminated ||
+                                       cStatus.Status == appcontext.AppContextStatusEnum.TerminateFailed {
+                                       break
+                               }
+                       }
+
+               }
+               return nil
+       })
+
+}
+
+func getAppContextFlag(ac appcontext.AppContext) (bool, error) {
+       h, err := ac.GetCompositeAppHandle()
+       if err != nil {
+               return false, err
+       }
+       sh, err := ac.GetLevelHandle(h, "stopflag")
+       if sh == nil {
+               return false, err
+       } else {
+               v, err := ac.GetValue(sh)
+               if err != nil {
+                       return false, err
+               } else {
+                       return v.(bool), nil
+               }
+       }
+}
+
+func updateAppContextFlag(cid interface{}, sf bool) error {
+       ac := appcontext.AppContext{}
+       _, err := ac.LoadAppContext(cid)
+       if err != nil {
+               return err
+       }
+       hc, err := ac.GetCompositeAppHandle()
+       if err != nil {
+               return err
+       }
+       sh, err := ac.GetLevelHandle(hc, "stopflag")
+       if sh == nil {
+               _, err = ac.AddLevelValue(hc, "stopflag", sf)
+       } else {
+               err = ac.UpdateValue(sh, sf)
+       }
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func applyFnComApp(instca *CompositeAppContext, acStatus appcontext.AppContextStatus, f fn, sfn statusfn, breakonError bool) error {
+       con := connector.Init(instca.cid)
        //Cleanup
        defer con.RemoveClient()
        ac := appcontext.AppContext{}
-       h, err := ac.LoadAppContext(cid)
+       h, err := ac.LoadAppContext(instca.cid)
        if err != nil {
                return err
        }
 
+       // if terminating, wait for all retrying instantiate threads to exit
+       if acStatus.Status == appcontext.AppContextStatusEnum.Terminating {
+               waitForDone(ac)
+               err := updateAppContextFlag(instca.cid, false)
+               if err != nil {
+                       return err
+               }
+       }
+
        // initialize appcontext status
        err = initializeAppContextStatus(ac, acStatus)
        if err != nil {
@@ -375,6 +685,8 @@ func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn,
        })
        id, _ := ac.GetCompositeAppHandle()
        g, _ := errgroup.WithContext(context.Background())
+       wg, _ := errgroup.WithContext(context.Background())
+       kickoffRetryWatcher(instca, ac, acStatus, wg)
        // Iterate over all the subapps
        for _, app := range appList["apporder"] {
                appName := app
@@ -414,8 +726,13 @@ func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn,
                                        json.Unmarshal([]byte(resorder.(string)), &aov)
                                        // Keep retrying for reachability
                                        for {
+                                               done := allResourcesDone(ac, appName, cluster, aov)
+                                               if done {
+                                                       break
+                                               }
+
                                                // Wait for cluster to be reachable
-                                               err = waitForClusterReady(c, cluster)
+                                               err := waitForClusterReady(instca, ac, c, appName, cluster, aov)
                                                if err != nil {
                                                        // TODO: Add error handling
                                                        return err
@@ -479,19 +796,39 @@ func applyFnComApp(cid interface{}, acStatus appcontext.AppContextStatus, f fn,
                logutils.Error("Encountered error updating AppContext status", logutils.Fields{"error": err})
                return err
        }
+       if err := wg.Wait(); err != nil {
+               logutils.Error("Encountered error in watcher thread", logutils.Fields{"error": err})
+               return err
+       }
        return nil
 }
 
 // InstantiateComApp Instantiate Apps in Composite App
 func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
-       go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Instantiating},
+       instca.cid = cid
+       instca.chans = []chan bool{}
+       instca.mutex = sync.Mutex{}
+       err := updateAppContextFlag(cid, false)
+       if err != nil {
+               logutils.Error("Encountered error updating AppContext flag", logutils.Fields{"error": err})
+               return err
+       }
+       go applyFnComApp(instca, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Instantiating},
                instantiateResource, addStatusTracker, true)
        return nil
 }
 
 // TerminateComApp Terminates Apps in Composite App
 func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
-       go applyFnComApp(cid, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Terminating},
+       instca.cid = cid
+       instca.chans = []chan bool{}
+       instca.mutex = sync.Mutex{}
+       err := updateAppContextFlag(cid, true)
+       if err != nil {
+               logutils.Error("Encountered error updating AppContext flag", logutils.Fields{"error": err})
+               return err
+       }
+       go applyFnComApp(instca, appcontext.AppContextStatus{Status: appcontext.AppContextStatusEnum.Terminating},
                terminateResource, deleteStatusTracker, false)
        return nil
 }