Adds composite app status update and query 57/109757/14
authorEric Multanen <eric.w.multanen@intel.com>
Thu, 2 Jul 2020 06:30:49 +0000 (23:30 -0700)
committerEric Multanen <eric.w.multanen@intel.com>
Wed, 8 Jul 2020 20:36:34 +0000 (13:36 -0700)
This patch provides the basic framework for supporting
monitoring of composite application resources in clusters.

1. Updates to the monitor files for use with v2.
2. Invokes the Watcher process per cluster/app when the
   app is instantiated.
3. Adds a ResourceBundleState CR resource to the cluster/app
   so that monitor will be able to update status to it.
4. Watcher updates appropriate appcontext status object
   when updates are made in clusters by monitor
5. Update appcontext library to define a status handle
   and object at the app/cluster level
6. Labels resources with an appropriate tracking label
   to coordinate with the ResourceBundleState CR

Issue-ID: MULTICLOUD-1042
Signed-off-by: Eric Multanen <eric.w.multanen@intel.com>
Change-Id: If007c1fd86ca7a65bb941d1776cfd2d3afed766b

26 files changed:
deployments/kubernetes/cleanup-emco.sh [moved from kud/tests/cleanup-composite-vfw.sh with 80% similarity, mode: 0755]
kud/tests/README-composite-vfw.txt
kud/tests/vfw-test-clean-cluster.sh [changed mode: 0644->0755]
kud/tests/vfw-test.sh
src/monitor/build/Dockerfile
src/monitor/deploy/cluster_role.yaml [new file with mode: 0644]
src/monitor/deploy/clusterrole_binding.yaml [new file with mode: 0644]
src/monitor/deploy/monitor-cleanup.sh [new file with mode: 0755]
src/monitor/deploy/monitor-deploy.sh [new file with mode: 0755]
src/monitor/deploy/operator.yaml
src/monitor/deploy/role.yaml
src/monitor/go.mod
src/monitor/pkg/apis/k8splugin/v1alpha1/types.go
src/orchestrator/api/api.go
src/orchestrator/api/instantiation_handler.go
src/orchestrator/go.mod
src/orchestrator/pkg/appcontext/appcontext.go
src/orchestrator/pkg/appcontext/appcontext_test.go
src/orchestrator/pkg/module/deployment_intent_groups.go
src/orchestrator/pkg/module/instantiation.go
src/orchestrator/pkg/module/instantiation_appcontext_helper.go
src/orchestrator/pkg/rtcontext/rtcontext.go
src/rsync/pkg/connector/connector.go
src/rsync/pkg/context/context.go
src/rsync/pkg/resource/resource.go
src/rsync/pkg/status/status.go

old mode 100644 (file)
new mode 100755 (executable)
similarity index 80%
rename from kud/tests/cleanup-composite-vfw.sh
rename to deployments/kubernetes/cleanup-emco.sh
index 7f96e8a..a8aef47
@@ -14,8 +14,3 @@ kubectl -n onap4k8s delete configmap orchestrator
 kubectl -n onap4k8s delete configmap ncm
 kubectl -n onap4k8s delete configmap ovnaction
 kubectl -n onap4k8s delete configmap rsync
-
-# delete the networks
-kubectl delete network protected-private-net
-kubectl delete providernetwork emco-private-net
-kubectl delete providernetwork unprotected-private-net
index d2018c0..3f334a2 100644 (file)
@@ -7,7 +7,29 @@ As written, the vfw-test.sh script assumes 3 clusters
 
 The edge cluster in which vFW will be instantiated should be KUD clusters.
 
-# Preparations
+# Edge cluster preparation
+
+For status monitoring support, the 'monitor' docker image must be built and
+deployed.
+
+In multicloud-k8s repo:
+       cd multicloud-k8s/src/monitor
+       docker build -f build/Dockerfile . -t monitor
+       <tag and push docker image to dockerhub ...>
+
+Deploy monitor program in each cluster (assumes multicloud-k8s repo is present in cloud)
+       # one time setup per cluster - install the CRD
+       cd multicloud-k8s/src/monitor/deploy/crds
+       kubectl apply -f crds/k8splugin_v1alpha1_resourcebundlestate_crd.yaml
+       
+       # one time setup per cluster
+       # update yaml files with correct image
+       # (cleanup first, if monitor was already installed - see monitor-cleanup.sh)
+       cd multicloud-k8s/src/monitor/deploy
+       monitor-deploy.sh
+
+
+# Preparation of the vFW Composit Application
 
 ## Prepare the Composite vFW Application Charts and Profiles
 
old mode 100644 (file)
new mode 100755 (executable)
index 3d38646..153924c
@@ -1,6 +1,17 @@
+#!/bin/bash
+
+# script to delete vfw resources (until terminate is completed)
 kubectl delete deploy fw0-packetgen
 kubectl delete deploy fw0-firewall
 kubectl delete deploy fw0-sink
 kubectl delete service packetgen-service
 kubectl delete service sink-service
 kubectl delete configmap sink-configmap
+
+kubectl delete network protected-private-net
+kubectl delete providernetwork emco-private-net
+kubectl delete providernetwork unprotected-private-net
+
+for i in `kubectl get resourcebundlestate --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}'`; do
+    kubectl delete resourcebundlestate $i
+done
index 2bdddcd..9c431ef 100755 (executable)
@@ -969,6 +969,9 @@ function instantiateVfw {
     call_api -d "{ }" "${base_url_orchestrator}/projects/${projectname}/composite-apps/${vfw_compositeapp_name}/${vfw_compositeapp_version}/deployment-intent-groups/${deployment_intent_group_name}/instantiate"
 }
 
+function statusVfw {
+    call_api "${base_url_orchestrator}/projects/${projectname}/composite-apps/${vfw_compositeapp_name}/${vfw_compositeapp_version}/deployment-intent-groups/${deployment_intent_group_name}/status"
+}
 
 function usage {
     echo "Usage: $0  create|get|delete|apply|terminate|instantiate"
@@ -986,12 +989,14 @@ function usage {
     echo "    delete - deletes all resources in ncm, ovnaction, clm resources created for vfw"
     echo "    apply - applys the network intents - e.g. networks created in ncm"
     echo "    instantiate - approves and instantiates the composite app via the generic deployment intent"
+    echo "    status - get status of deployed resources"
     echo "    terminate - remove the network inents created by ncm"
     echo ""
     echo "    a reasonable test sequence:"
     echo "    1.  create"
     echo "    2.  apply"
     echo "    3.  instantiate"
+    echo "    4.  status"
 
     exit
 }
@@ -1050,5 +1055,6 @@ case "$1" in
     "apply" ) applyNcmData ;;
     "terminate" ) terminateNcmData ;;
     "instantiate" ) instantiateVfw ;;
+    "status" ) statusVfw ;;
     *) usage ;;
 esac
index 812eb47..9ecff16 100644 (file)
@@ -1,15 +1,16 @@
-FROM registry.access.redhat.com/ubi7/ubi-minimal:latest
+FROM golang:1.14.1
 
-ENV OPERATOR=/usr/local/bin/monitor \
-    USER_UID=1001 \
-    USER_NAME=monitor
+WORKDIR /go/src/github.com/onap/multicloud-k8s/src/monitor
+COPY ./ ./
+RUN make all
 
-# install operator binary
-COPY _output/bin/monitor ${OPERATOR}
+FROM ubuntu:16.04
 
-COPY bin /usr/local/bin
-RUN  /usr/local/bin/user_setup
+WORKDIR /opt/monitor
+RUN groupadd -r monitor && useradd -r -g monitor monitor
+RUN chown monitor:monitor /opt/monitor -R
+COPY --chown=monitor --from=0 /go/src/github.com/onap/multicloud-k8s/src/monitor/monitor ./
 
-ENTRYPOINT ["/usr/local/bin/entrypoint"]
+USER monitor
+ENTRYPOINT ["/opt/monitor/monitor"]
 
-USER ${USER_UID}
diff --git a/src/monitor/deploy/cluster_role.yaml b/src/monitor/deploy/cluster_role.yaml
new file mode 100644 (file)
index 0000000..0732e8d
--- /dev/null
@@ -0,0 +1,72 @@
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+  creationTimestamp: null
+  name: monitor
+rules:
+- apiGroups:
+  - ""
+  resources:
+  - pods
+  - services
+  - endpoints
+  - persistentvolumeclaims
+  - events
+  - configmaps
+  - secrets
+  verbs:
+  - '*'
+- apiGroups:
+  - apps
+  resources:
+  - deployments
+  - daemonsets
+  - replicasets
+  - statefulsets
+  verbs:
+  - '*'
+- apiGroups:
+  - monitoring.coreos.com
+  resources:
+  - servicemonitors
+  verbs:
+  - get
+  - create
+- apiGroups:
+  - apps
+  resourceNames:
+  - monitor
+  resources:
+  - deployments/finalizers
+  verbs:
+  - update
+- apiGroups:
+  - ""
+  resources:
+  - pods
+  verbs:
+  - get
+- apiGroups:
+  - apps
+  resources:
+  - replicasets
+  verbs:
+  - get
+- apiGroups:
+  - k8splugin.io
+  resources:
+  - '*'
+  verbs:
+  - '*'
+- apiGroups:
+  - batch
+  resources:
+  - '*'
+  verbs:
+  - '*'
+- apiGroups:
+  - extensions
+  resources:
+  - '*'
+  verbs:
+  - '*'
diff --git a/src/monitor/deploy/clusterrole_binding.yaml b/src/monitor/deploy/clusterrole_binding.yaml
new file mode 100644 (file)
index 0000000..73e7403
--- /dev/null
@@ -0,0 +1,12 @@
+kind: ClusterRoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: monitor
+subjects:
+- kind: ServiceAccount
+  name: monitor
+  namespace: default
+roleRef:
+  kind: ClusterRole
+  name: monitor
+  apiGroup: rbac.authorization.k8s.io
diff --git a/src/monitor/deploy/monitor-cleanup.sh b/src/monitor/deploy/monitor-cleanup.sh
new file mode 100755 (executable)
index 0000000..2172507
--- /dev/null
@@ -0,0 +1,6 @@
+kubectl delete rolebinding monitor
+kubectl delete clusterrolebinding monitor
+kubectl delete role monitor
+kubectl delete clusterrole monitor
+kubectl delete serviceaccount monitor
+kubectl delete deploy monitor
diff --git a/src/monitor/deploy/monitor-deploy.sh b/src/monitor/deploy/monitor-deploy.sh
new file mode 100755 (executable)
index 0000000..47c7120
--- /dev/null
@@ -0,0 +1,6 @@
+kubectl apply -f role.yaml
+kubectl apply -f cluster_role.yaml
+kubectl apply -f role_binding.yaml
+kubectl apply -f clusterrole_binding.yaml
+kubectl apply -f service_account.yaml
+kubectl apply -f operator.yaml
index a06c07d..93e4522 100644 (file)
@@ -3,30 +3,28 @@ kind: Deployment
 metadata:
   name: monitor
   labels:
-    "emco/deployment-id": "bionic-beaver"
+    "emco/deployment-id": "monitor"
 spec:
   replicas: 1
   selector:
     matchLabels:
-      "emco/deployment-id": "bionic-beaver"
+      "emco/deployment-id": "monitor"
   template:
     metadata:
       labels:
-        "emco/deployment-id": "bionic-beaver"
+        "emco/deployment-id": "monitor"
     spec:
       serviceAccountName: monitor
       containers:
         - name: monitor
           # Replace this with the built image name
-          image: k8splugin.io/monitor:latest
+          image: ewmduck/monitor:latest
           command:
-          - monitor
+          - /opt/monitor/monitor
           imagePullPolicy: IfNotPresent
           env:
             - name: WATCH_NAMESPACE
-              valueFrom:
-                fieldRef:
-                  fieldPath: metadata.namespace
+              value: ""
             - name: POD_NAME
               valueFrom:
                 fieldRef:
index 4d0fd1b..c48141a 100644 (file)
@@ -58,3 +58,15 @@ rules:
   - '*'
   verbs:
   - '*'
+- apiGroups:
+  - batch
+  resources:
+  - '*'
+  verbs:
+  - '*'
+- apiGroups:
+  - extensions
+  resources:
+  - '*'
+  verbs:
+  - '*'
index ec48d26..6eff59a 100644 (file)
@@ -32,6 +32,4 @@ replace (
        sigs.k8s.io/controller-tools => sigs.k8s.io/controller-tools v0.1.11-0.20190411181648-9d55346c2bde
 )
 
-// Remove hg dependency using this mirror
 replace github.com/operator-framework/operator-sdk => github.com/operator-framework/operator-sdk v0.9.0
-
index 231f226..064591f 100644 (file)
@@ -15,8 +15,8 @@ import (
 // +kubebuilder:subresource:status
 // +genclient
 type ResourceBundleState struct {
-       metav1.TypeMeta   `json:",inline"`
-       metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
+       metav1.TypeMeta   `json:",inline" yaml:",inline"`
+       metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata" yaml:"metadata"`
 
        Spec   ResourceBundleStateSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`
        Status ResourceBundleStatus    `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
index 2470a1b..5abbb96 100644 (file)
@@ -180,6 +180,8 @@ func NewRouter(projectClient moduleLib.ProjectManager,
        }
 
        router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/instantiate", instantiationHandler.instantiateHandler).Methods("POST")
+       router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/terminate", instantiationHandler.terminateHandler).Methods("POST")
+       router.HandleFunc("/projects/{project-name}/composite-apps/{composite-app-name}/{composite-app-version}/deployment-intent-groups/{deployment-intent-group-name}/status", instantiationHandler.statusHandler).Methods("GET")
 
        return router
 }
index c95785f..ce50e5b 100644 (file)
 package api
 
 import (
+       "encoding/json"
+       "net/http"
+
        "github.com/gorilla/mux"
        moduleLib "github.com/onap/multicloud-k8s/src/orchestrator/pkg/module"
-       "net/http"
 )
 
 /* Used to store backend implementation objects
@@ -45,3 +47,45 @@ func (h instantiationHandler) instantiateHandler(w http.ResponseWriter, r *http.
        w.WriteHeader(http.StatusAccepted)
 
 }
+
+func (h instantiationHandler) terminateHandler(w http.ResponseWriter, r *http.Request) {
+
+       vars := mux.Vars(r)
+       p := vars["project-name"]
+       ca := vars["composite-app-name"]
+       v := vars["composite-app-version"]
+       di := vars["deployment-intent-group-name"]
+
+       iErr := h.client.Terminate(p, ca, v, di)
+       if iErr != nil {
+               http.Error(w, iErr.Error(), http.StatusInternalServerError)
+               return
+       }
+       w.WriteHeader(http.StatusAccepted)
+
+}
+
+func (h instantiationHandler) statusHandler(w http.ResponseWriter, r *http.Request) {
+
+       vars := mux.Vars(r)
+       p := vars["project-name"]
+       ca := vars["composite-app-name"]
+       v := vars["composite-app-version"]
+       di := vars["deployment-intent-group-name"]
+
+       status, iErr := h.client.Status(p, ca, v, di)
+       if iErr != nil {
+               http.Error(w, iErr.Error(), http.StatusInternalServerError)
+               return
+       }
+
+       w.Header().Set("Content-Type", "application/json")
+       w.WriteHeader(http.StatusOK)
+       iErr = json.NewEncoder(w).Encode(status)
+       if iErr != nil {
+               http.Error(w, iErr.Error(), http.StatusInternalServerError)
+               return
+       }
+       w.WriteHeader(http.StatusAccepted)
+
+}
index 223dc06..3f14f00 100644 (file)
@@ -2,17 +2,12 @@ module github.com/onap/multicloud-k8s/src/orchestrator
 
 require (
        github.com/MakeNowJust/heredoc v1.0.0 // indirect
-       github.com/Masterminds/goutils v1.1.0 // indirect
        github.com/Masterminds/semver v1.5.0 // indirect
        github.com/Masterminds/sprig v2.22.0+incompatible // indirect
        github.com/chai2010/gettext-go v0.0.0-20170215093142-bf70f2a70fb1
        github.com/coreos/etcd v3.3.12+incompatible
-       github.com/cyphar/filepath-securejoin v0.2.2 // indirect
        github.com/docker/docker v1.13.1 // indirect
-       github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c // indirect
-       github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect
        github.com/ghodss/yaml v1.0.0
-       github.com/gobwas/glob v0.2.3 // indirect
        github.com/golang/protobuf v1.4.1
        github.com/gorilla/handlers v1.3.0
        github.com/gorilla/mux v1.7.3
@@ -21,34 +16,32 @@ require (
        github.com/lib/pq v1.6.0 // indirect
        github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
        github.com/mitchellh/copystructure v1.0.0 // indirect
-       github.com/mitchellh/go-wordwrap v1.0.0 // indirect
        github.com/onap/multicloud-k8s/src/clm v0.0.0-00010101000000-000000000000
+       github.com/onap/multicloud-k8s/src/monitor v0.0.0-20200630152613-7c20f73e7c5d
        github.com/onap/multicloud-k8s/src/ncm v0.0.0-20200515060444-c77850a75eee
+       github.com/onap/multicloud-k8s/src/rsync v0.0.0-20200630152613-7c20f73e7c5d
        github.com/pkg/errors v0.8.1
        github.com/rubenv/sql-migrate v0.0.0-20200429072036-ae26b214fa43 // indirect
        github.com/russross/blackfriday v1.5.2
        github.com/sirupsen/logrus v1.4.2
        github.com/spf13/cobra v1.0.0 // indirect
-       github.com/technosophos/moniker v0.0.0-20180509230615-a5dbd03a2245 // indirect
        go.etcd.io/etcd v3.3.12+incompatible
        go.mongodb.org/mongo-driver v1.0.0
        golang.org/x/net v0.0.0-20200301022130-244492dfa37a
        google.golang.org/grpc v1.27.1
        google.golang.org/protobuf v1.24.0
        gopkg.in/square/go-jose.v2 v2.5.1 // indirect
+       gopkg.in/yaml.v2 v2.2.8
        gopkg.in/yaml.v3 v3.0.0-20200506231410-2ff61e1afc86
-       k8s.io/apiextensions-apiserver v0.0.0-00010101000000-000000000000 // indirect
        k8s.io/apimachinery v0.0.0-20190831074630-461753078381
-       k8s.io/apiserver v0.0.0-00010101000000-000000000000 // indirect
-       k8s.io/cli-runtime v0.0.0-00010101000000-000000000000 // indirect
        k8s.io/cloud-provider v0.0.0-00010101000000-000000000000 // indirect
        k8s.io/helm v2.14.3+incompatible
        sigs.k8s.io/kustomize v2.0.3+incompatible // indirect
-       vbom.ml/util v0.0.0-20180919145318-efcd4e0f9787 // indirect
 )
 
 replace (
        github.com/onap/multicloud-k8s/src/clm => ../clm
+       github.com/onap/multicloud-k8s/src/monitor => ../monitor
        k8s.io/api => k8s.io/api v0.0.0-20190409021203-6e4e0e4f393b
        k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.0.0-20190409022649-727a075fdec8
        k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190404173353-6a84e37a896d
index a847ae3..cdf23bf 100644 (file)
@@ -18,10 +18,11 @@ package appcontext
 
 import (
        "fmt"
+       "strings"
+
        log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/rtcontext"
        pkgerrors "github.com/pkg/errors"
-       "strings"
 )
 
 // metaPrefix used for denoting clusterMeta level
@@ -413,6 +414,59 @@ func (ac *AppContext) GetResourceInstruction(appname string, clustername string,
        return v, nil
 }
 
+//AddStatus for holding status of all resources under app and cluster
+// handle should be a cluster handle
+func (ac *AppContext) AddStatus(handle interface{}, value interface{}) (interface{}, error) {
+       h, err := ac.rtc.RtcAddStatus(handle, value)
+       if err != nil {
+               return nil, err
+       }
+       log.Info(":: Added status handle ::", log.Fields{"StatusHandler": h})
+
+       return h, nil
+}
+
+//DeleteStatus for the given the handle
+func (ac *AppContext) DeleteStatus(handle interface{}) error {
+       err := ac.rtc.RtcDeletePair(handle)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+//Return the handle for status for a given app and cluster
+func (ac *AppContext) GetStatusHandle(appname string, clustername string) (interface{}, error) {
+       if appname == "" {
+               return nil, pkgerrors.Errorf("Not a valid run time context app name")
+       }
+       if clustername == "" {
+               return nil, pkgerrors.Errorf("Not a valid run time context cluster name")
+       }
+
+       rh, err := ac.rtc.RtcGet()
+       if err != nil {
+               return nil, err
+       }
+
+       acrh := fmt.Sprintf("%v", rh) + "app/" + appname + "/cluster/" + clustername + "/status/"
+       hs, err := ac.rtc.RtcGetHandles(acrh)
+       if err != nil {
+               return nil, err
+       }
+       for _, v := range hs {
+               if v == acrh {
+                       return v, nil
+               }
+       }
+       return nil, pkgerrors.Errorf("No handle was found for the given resource")
+}
+
+//UpdateStatusValue updates the status value with the given handle
+func (ac *AppContext) UpdateStatusValue(handle interface{}, value interface{}) error {
+       return ac.rtc.RtcUpdateValue(handle, value)
+}
+
 //Return all the handles under the composite app
 func (ac *AppContext) GetAllHandles(handle interface{}) ([]interface{}, error) {
        hs, err := ac.rtc.RtcGetHandles(handle)
index 05c7370..92c4311 100644 (file)
@@ -145,6 +145,10 @@ func (c *MockRunTimeContext) RtcUpdateValue(handle interface{}, value interface{
        return c.Err
 }
 
+func (rtc *MockRunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) {
+       return nil, nil
+}
+
 func TestCreateCompositeApp(t *testing.T) {
        var ac = AppContext{}
        testCases := []struct {
index 16a14c7..3412a03 100644 (file)
@@ -20,6 +20,7 @@ import (
        "encoding/json"
        "reflect"
 
+       appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db"
 
        pkgerrors "github.com/pkg/errors"
@@ -61,6 +62,7 @@ type OverrideValues struct {
 type DeploymentIntentGroupManager interface {
        CreateDeploymentIntentGroup(d DeploymentIntentGroup, p string, ca string, v string) (DeploymentIntentGroup, error)
        GetDeploymentIntentGroup(di string, p string, ca string, v string) (DeploymentIntentGroup, error)
+       GetDeploymentIntentGroupContext(di string, p string, ca string, v string) (appcontext.AppContext, error)
        DeleteDeploymentIntentGroup(di string, p string, ca string, v string) error
 }
 
@@ -86,6 +88,7 @@ func (dk DeploymentIntentGroupKey) String() string {
 type DeploymentIntentGroupClient struct {
        storeName   string
        tagMetaData string
+       tagContext  string
 }
 
 // NewDeploymentIntentGroupClient return an instance of DeploymentIntentGroupClient which implements DeploymentIntentGroupManager
@@ -93,6 +96,7 @@ func NewDeploymentIntentGroupClient() *DeploymentIntentGroupClient {
        return &DeploymentIntentGroupClient{
                storeName:   "orchestrator",
                tagMetaData: "deploymentintentgroupmetadata",
+               tagContext:  "contextid",
        }
 }
 
@@ -160,6 +164,34 @@ func (c *DeploymentIntentGroupClient) GetDeploymentIntentGroup(di string, p stri
 
 }
 
+// GetDeploymentIntentGroup returns the DeploymentIntentGroup with a given name, project, compositeApp and version of compositeApp
+func (c *DeploymentIntentGroupClient) GetDeploymentIntentGroupContext(di string, p string, ca string, v string) (appcontext.AppContext, error) {
+
+       key := DeploymentIntentGroupKey{
+               Name:         di,
+               Project:      p,
+               CompositeApp: ca,
+               Version:      v,
+       }
+
+       result, err := db.DBconn.Find(c.storeName, key, c.tagContext)
+       if err != nil {
+               return appcontext.AppContext{}, pkgerrors.Wrap(err, "Get DeploymentIntentGroup Context error")
+       }
+
+       if result != nil {
+               ctxVal := string(result[0])
+               var cc appcontext.AppContext
+               _, err = cc.LoadAppContext(ctxVal)
+               if err != nil {
+                       return appcontext.AppContext{}, pkgerrors.Wrap(err, "Error loading DeploymentIntentGroup Appcontext")
+               }
+               return cc, nil
+       }
+
+       return appcontext.AppContext{}, pkgerrors.New("Error getting DeploymentIntentGroup AppContext")
+}
+
 // DeleteDeploymentIntentGroup deletes a DeploymentIntentGroup
 func (c *DeploymentIntentGroupClient) DeleteDeploymentIntentGroup(di string, p string, ca string, v string) error {
        k := DeploymentIntentGroupKey{
index 043b80f..9432e4b 100644 (file)
@@ -21,6 +21,7 @@ import (
        "encoding/json"
        "fmt"
 
+       rb "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
        gpic "github.com/onap/multicloud-k8s/src/orchestrator/pkg/gpic"
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db"
        log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
@@ -42,17 +43,25 @@ type InstantiationClient struct {
        db InstantiationClientDbInfo
 }
 
+type ClusterAppStatus struct {
+       Cluster string
+       App     string
+       Status  rb.ResourceBundleStatus
+}
+
+type StatusData struct {
+       Data []ClusterAppStatus
+}
+
 /*
 InstantiationKey used in storing the contextid in the momgodb
 It consists of
-GenericPlacementIntentName,
 ProjectName,
 CompositeAppName,
 CompositeAppVersion,
 DeploymentIntentGroup
 */
 type InstantiationKey struct {
-       IntentName            string
        Project               string
        CompositeApp          string
        Version               string
@@ -64,6 +73,8 @@ type InstantiationKey struct {
 type InstantiationManager interface {
        //ApproveInstantiation(p string, ca string, v string, di string) (error)
        Instantiate(p string, ca string, v string, di string) error
+       Status(p string, ca string, v string, di string) (StatusData, error)
+       Terminate(p string, ca string, v string, di string) error
 }
 
 // InstantiationClientDbInfo consists of storeName and tagContext
@@ -229,6 +240,12 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin
                        return pkgerrors.Wrapf(err, "Unable to get the resources for app :: %s", eachApp.Metadata.Name)
                }
 
+               statusResource, err := getStatusResource(ctxval.(string), eachApp.Metadata.Name)
+               if err != nil {
+                       return pkgerrors.Wrapf(err, "Unable to generate the status resource for app :: %s", eachApp.Metadata.Name)
+               }
+               resources = append(resources, statusResource)
+
                specData, err := NewAppIntentClient().GetAllIntentsByApp(eachApp.Metadata.Name, p, ca, v, gIntent)
                if err != nil {
                        return pkgerrors.Wrap(err, "Unable to get the intents for app")
@@ -269,12 +286,11 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin
        //END: storing into etcd
 
        // BEGIN:: save the context in the orchestrator db record
-       key := InstantiationKey{
-               IntentName:            gIntent,
-               Project:               p,
-               CompositeApp:          ca,
-               Version:               v,
-               DeploymentIntentGroup: di,
+       key := DeploymentIntentGroupKey{
+               Name:         di,
+               Project:      p,
+               CompositeApp: ca,
+               Version:      v,
        }
 
        err = db.DBconn.Insert(c.db.storeName, key, nil, c.db.tagContext, ctxval)
@@ -324,3 +340,84 @@ func (c InstantiationClient) Instantiate(p string, ca string, v string, di strin
        log.Info(":: Done with instantiation... ::", log.Fields{"CompositeAppName": ca})
        return err
 }
+
+/*
+Status takes in projectName, compositeAppName, compositeAppVersion,
+DeploymentIntentName. This method is responsible obtaining the status of
+the deployment, which is made available in the appcontext.
+*/
+func (c InstantiationClient) Status(p string, ca string, v string, di string) (StatusData, error) {
+
+       ac, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v)
+       if err != nil {
+               return StatusData{}, pkgerrors.Wrap(err, "deploymentIntentGroup not found "+di)
+       }
+
+       // Get all apps in this composite app
+       allApps, err := NewAppClient().GetApps(p, ca, v)
+       if err != nil {
+               return StatusData{}, pkgerrors.Wrap(err, "Not finding the apps")
+       }
+
+       var diStatus StatusData
+       diStatus.Data = make([]ClusterAppStatus, 0)
+
+       // Loop through each app and get the status data for each cluster in the app
+       for _, app := range allApps {
+               // Get the clusters in the appcontext for this app
+               clusters, err := ac.GetClusterNames(app.Metadata.Name)
+               if err != nil {
+                       log.Info(":: No clusters for app ::", log.Fields{"AppName": app.Metadata.Name})
+                       continue
+               }
+
+               for _, cluster := range clusters {
+                       handle, err := ac.GetStatusHandle(app.Metadata.Name, cluster)
+                       if err != nil {
+                               log.Info(":: No status handle for cluster, app ::",
+                                       log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err})
+                               continue
+                       }
+                       statusValue, err := ac.GetValue(handle)
+                       if err != nil {
+                               log.Info(":: No status value for cluster, app ::",
+                                       log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err})
+                               continue
+                       }
+                       log.Info(":: STATUS VALUE ::", log.Fields{"statusValue": statusValue})
+                       var statusData ClusterAppStatus
+                       err = json.Unmarshal([]byte(statusValue.(string)), &statusData.Status)
+                       if err != nil {
+                               log.Info(":: Error unmarshaling status value for cluster, app ::",
+                                       log.Fields{"Cluster": cluster, "AppName": app.Metadata.Name, "Error": err})
+                               continue
+                       }
+                       statusData.Cluster = cluster
+                       statusData.App = app.Metadata.Name
+                       log.Info(":: STATUS DATA ::", log.Fields{"status": statusData})
+
+                       diStatus.Data = append(diStatus.Data, statusData)
+               }
+       }
+
+       return diStatus, nil
+}
+
+/*
+Terminate takes in projectName, compositeAppName, compositeAppVersion,
+DeploymentIntentName and calls rsync to terminate.
+*/
+func (c InstantiationClient) Terminate(p string, ca string, v string, di string) error {
+
+       //ac, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v)
+       _, err := NewDeploymentIntentGroupClient().GetDeploymentIntentGroupContext(di, p, ca, v)
+       if err != nil {
+               return pkgerrors.Wrap(err, "deploymentIntentGroup not found "+di)
+       }
+
+       // TODO - make call to rsync to terminate the composite app deployment
+       //        will leave the appcontext in place for clean up later
+       //        so monitoring status can be performed
+
+       return nil
+}
index 43ddd6d..e6e2bf3 100644 (file)
@@ -25,12 +25,16 @@ import (
        "encoding/json"
        "io/ioutil"
 
+       jyaml "github.com/ghodss/yaml"
+
+       rb "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
        gpic "github.com/onap/multicloud-k8s/src/orchestrator/pkg/gpic"
        log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
        "github.com/onap/multicloud-k8s/src/orchestrator/utils"
        "github.com/onap/multicloud-k8s/src/orchestrator/utils/helm"
        pkgerrors "github.com/pkg/errors"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 )
 
 // resource consists of name of reource
@@ -90,6 +94,45 @@ func getResources(st []helm.KubernetesResourceTemplate) ([]resource, error) {
        return resources, nil
 }
 
+// addStatusResource adds a status monitoring resource to the app
+// which consists of name(name+kind) and content
+func getStatusResource(id, app string) (resource, error) {
+
+       var statusCr rb.ResourceBundleState
+
+       label := id + "-" + app
+       name := app + "-" + id
+
+       statusCr.TypeMeta.APIVersion = "k8splugin.io/v1alpha1"
+       statusCr.TypeMeta.Kind = "ResourceBundleState"
+       statusCr.SetName(name)
+
+       labels := make(map[string]string)
+       labels["emco/deployment-id"] = label
+       statusCr.SetLabels(labels)
+
+       labelSelector, err := metav1.ParseToLabelSelector("emco/deployment-id = " + label)
+       if err != nil {
+               log.Info(":: ERROR Parsing Label Selector ::", log.Fields{"Error": err})
+       } else {
+               statusCr.Spec.Selector = labelSelector
+       }
+
+       // Marshaling to json then convert to yaml works better than marshaling to yaml
+       // The 'apiVersion' attribute was marshaling to 'apiversion'
+       //      y, _ := yaml.Marshal(&statusCr)
+       j, _ := json.Marshal(&statusCr)
+       y, _ := jyaml.JSONToYAML(j)
+       log.Info(":: RESULTING STATUS CR ::", log.Fields{"StatusCR": y})
+
+       statusResource := resource{
+               name:        name + "+" + "ResourceBundleState",
+               filecontent: string(y),
+       }
+
+       return statusResource, nil
+}
+
 func addResourcesToCluster(ct appcontext.AppContext, ch interface{}, resources []resource) error {
 
        var resOrderInstr struct {
index 432c5d8..f3905eb 100644 (file)
@@ -18,11 +18,12 @@ package rtcontext
 
 import (
        "fmt"
-       "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/contextdb"
-       pkgerrors "github.com/pkg/errors"
        "math/rand"
        "strings"
        "time"
+
+       "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/contextdb"
+       pkgerrors "github.com/pkg/errors"
 )
 
 const maxrand = 0x7fffffffffffffff
@@ -40,6 +41,7 @@ type Rtcontext interface {
        RtcAddMeta(meta interface{}) error
        RtcGet() (interface{}, error)
        RtcAddLevel(handle interface{}, level string, value string) (interface{}, error)
+       RtcAddStatus(handle interface{}, value interface{}) (interface{}, error)
        RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error)
        RtcAddInstruction(handle interface{}, level string, insttype string, value interface{}) (interface{}, error)
        RtcDeletePair(handle interface{}) error
@@ -201,6 +203,26 @@ func (rtc *RunTimeContext) RtcAddOneLevel(pl interface{}, level string, value in
        return (interface{})(key), nil
 }
 
+// Add status under the given level and return new handle
+func (rtc *RunTimeContext) RtcAddStatus(handle interface{}, value interface{}) (interface{}, error) {
+
+       str := fmt.Sprintf("%v", handle)
+       sid := fmt.Sprintf("%v", rtc.cid)
+       if !strings.HasPrefix(str, sid) {
+               return nil, pkgerrors.Errorf("Not a valid run time context handle")
+       }
+       if value == nil {
+               return nil, pkgerrors.Errorf("Not a valid run time context resource value")
+       }
+
+       k := str + "status" + "/"
+       err := contextdb.Db.Put(k, value)
+       if err != nil {
+               return nil, pkgerrors.Errorf("Error adding run time context status: %s", err.Error())
+       }
+       return (interface{})(k), nil
+}
+
 // Add a resource under the given level and return new handle
 func (rtc *RunTimeContext) RtcAddResource(handle interface{}, resname string, value interface{}) (interface{}, error) {
 
index 6e17f87..fc8aa83 100644 (file)
@@ -19,8 +19,6 @@ package connector
 import (
        "log"
 
-       "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config"
-
        corev1 "k8s.io/api/core/v1"
        "k8s.io/apimachinery/pkg/api/meta"
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -52,7 +50,7 @@ type KubernetesConnector interface {
 // Reference is the interface that is implemented
 type Reference interface {
        //Create a kubernetes resource described by the yaml in yamlFilePath
-       Create(yamlFilePath string, namespace string, client KubernetesConnector) (string, error)
+       Create(yamlFilePath string, namespace string, label string, client KubernetesConnector) (string, error)
        //Delete a kubernetes resource described in the provided namespace
        Delete(yamlFilePath string, resname string, namespace string, client KubernetesConnector) error
 }
@@ -86,7 +84,7 @@ func TagPodsIfPresent(unstruct *unstructured.Unstructured, tag string) {
        if labels == nil {
                labels = map[string]string{}
        }
-       labels[config.GetConfiguration().KubernetesLabelName] = tag
+       labels["emco/deployment-id"] = tag
        podTemplateSpec.SetLabels(labels)
 
        updatedTemplate, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplateSpec)
index e5da129..7e0fce3 100644 (file)
@@ -18,67 +18,68 @@ package context
 
 import (
        "encoding/json"
-        "fmt"
-        "log"
-        "sync"
-        "strings"
-        "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
-        pkgerrors "github.com/pkg/errors"
-        res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource"
-        con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
-        "github.com/onap/multicloud-k8s/src/rsync/pkg/app"
+       "fmt"
+       "log"
+       "strings"
+       "sync"
+
+       "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
+       "github.com/onap/multicloud-k8s/src/rsync/pkg/app"
+       con "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+       res "github.com/onap/multicloud-k8s/src/rsync/pkg/resource"
+       status "github.com/onap/multicloud-k8s/src/rsync/pkg/status"
+       pkgerrors "github.com/pkg/errors"
 )
 
 type CompositeAppContext struct {
-       cid    interface{}
+       cid            interface{}
        appsorder      string
        appsdependency string
-       appsmap []instMap
+       appsmap        []instMap
 }
 type clusterInfo struct {
-       name   string
+       name          string
        resorder      string
        resdependency string
-       ressmap []instMap
+       ressmap       []instMap
 }
 type instMap struct {
-       name    string
-       depinfo string
-       status  string
-       rerr    error
+       name     string
+       depinfo  string
+       status   string
+       rerr     error
        clusters []clusterInfo
 }
 
 func getInstMap(order string, dependency string, level string) ([]instMap, error) {
 
-        if order == "" {
-              return nil, pkgerrors.Errorf("Not a valid order value")
-        }
-        if dependency == "" {
-              return nil, pkgerrors.Errorf("Not a valid dependency value")
-        }
-
-        if !(level == "app" || level == "res") {
-              return nil, pkgerrors.Errorf("Not a valid level name given to create map")
-        }
+       if order == "" {
+               return nil, pkgerrors.Errorf("Not a valid order value")
+       }
+       if dependency == "" {
+               return nil, pkgerrors.Errorf("Not a valid dependency value")
+       }
 
+       if !(level == "app" || level == "res") {
+               return nil, pkgerrors.Errorf("Not a valid level name given to create map")
+       }
 
-        var aov map[string]interface{}
-        json.Unmarshal([]byte(order), &aov)
+       var aov map[string]interface{}
+       json.Unmarshal([]byte(order), &aov)
 
-        s := fmt.Sprintf("%vorder", level)
-        appso := aov[s].([]interface{})
-        var instmap = make([]instMap, len(appso))
+       s := fmt.Sprintf("%vorder", level)
+       appso := aov[s].([]interface{})
+       var instmap = make([]instMap, len(appso))
 
-        var adv map[string]interface{}
-        json.Unmarshal([]byte(dependency), &adv)
-        s = fmt.Sprintf("%vdependency", level)
-        appsd := adv[s].(map[string]interface{})
-        for i, u := range appso {
-                instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil}
-        }
+       var adv map[string]interface{}
+       json.Unmarshal([]byte(dependency), &adv)
+       s = fmt.Sprintf("%vdependency", level)
+       appsd := adv[s].(map[string]interface{})
+       for i, u := range appso {
+               instmap[i] = instMap{u.(string), appsd[u.(string)].(string), "none", nil, nil}
+       }
 
-        return instmap, nil
+       return instmap, nil
 }
 
 func deleteResource(clustername string, resname string, respath string) error {
@@ -94,7 +95,7 @@ func deleteResource(clustername string, resname string, respath string) error {
        var gp res.Resource
        err = gp.Delete(respath, resname, "default", c)
        if err != nil {
-               log.Println("Delete resource failed: " +  err.Error() + resname)
+               log.Println("Delete resource failed: " + err.Error() + resname)
                return err
        }
        log.Println("Resource succesfully deleted", resname)
@@ -102,7 +103,7 @@ func deleteResource(clustername string, resname string, respath string) error {
 
 }
 
-func createResource(clustername string, resname string, respath string) error {
+func createResource(clustername string, resname string, respath string, label string) error {
        k8sClient := app.KubernetesClient{}
        err := k8sClient.Init(clustername, resname)
        if err != nil {
@@ -113,9 +114,9 @@ func createResource(clustername string, resname string, respath string) error {
        var c con.KubernetesConnector
        c = &k8sClient
        var gp res.Resource
-       _, err = gp.Create(respath,"default", c)
+       _, err = gp.Create(respath, "default", label, c)
        if err != nil {
-               log.Println("Create failed: " +  err.Error() + resname)
+               log.Println("Create failed: " + err.Error() + resname)
                return err
        }
        log.Println("Resource succesfully created", resname)
@@ -152,7 +153,7 @@ func terminateResource(ac appcontext.AppContext, resmap instMap, appname string,
 
 }
 
-func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string) error {
+func instantiateResource(ac appcontext.AppContext, resmap instMap, appname string, clustername string, label string) error {
        rh, err := ac.GetResourceHandle(appname, clustername, resmap.name)
        if err != nil {
                return err
@@ -168,7 +169,7 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin
                if result[0] == "" {
                        return pkgerrors.Errorf("Resource name is nil")
                }
-               err = createResource(clustername, result[0], resval.(string))
+               err = createResource(clustername, result[0], resval.(string), label)
                if err != nil {
                        return err
                }
@@ -180,97 +181,102 @@ func instantiateResource(ac appcontext.AppContext, resmap instMap, appname strin
 
 }
 
-func  terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
-        var wg sync.WaitGroup
-        var chans = make([]chan int, len(ressmap))
-        for l := range chans {
-                chans[l] = make(chan int)
-        }
-        for i:=0; i<len(ressmap); i++ {
-                wg.Add(1)
-                go func(index int) {
-                        if ressmap[index].depinfo == "go" {
-                                ressmap[index].status = "start"
-                        } else {
-                                ressmap[index].status = "waiting"
-                                c := <- chans[index]
-                                if c != index {
+func terminateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
+       var wg sync.WaitGroup
+       var chans = make([]chan int, len(ressmap))
+       for l := range chans {
+               chans[l] = make(chan int)
+       }
+       for i := 0; i < len(ressmap); i++ {
+               wg.Add(1)
+               go func(index int) {
+                       if ressmap[index].depinfo == "go" {
+                               ressmap[index].status = "start"
+                       } else {
+                               ressmap[index].status = "waiting"
+                               c := <-chans[index]
+                               if c != index {
                                        ressmap[index].status = "error"
-                                        ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
+                                       ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
                                        wg.Done()
                                        return
-                                }
-                                ressmap[index].status = "start"
-                        }
-                        ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername)
-                        ressmap[index].status = "done"
-                        waitstr := fmt.Sprintf("wait on %v",ressmap[index].name)
-                        for j:=0; j<len(ressmap); j++ {
-                                if ressmap[j].depinfo == waitstr {
-                                        chans[j] <- j
-                                }
-                        }
-                        wg.Done()
-                }(i)
-        }
-        wg.Wait()
-        for k:=0; k<len(ressmap); k++ {
-                if ressmap[k].rerr != nil {
-                        return pkgerrors.Errorf("error during resources termination")
-                }
-        }
-        return nil
+                               }
+                               ressmap[index].status = "start"
+                       }
+                       ressmap[index].rerr = terminateResource(ac, ressmap[index], appname, clustername)
+                       ressmap[index].status = "done"
+                       waitstr := fmt.Sprintf("wait on %v", ressmap[index].name)
+                       for j := 0; j < len(ressmap); j++ {
+                               if ressmap[j].depinfo == waitstr {
+                                       chans[j] <- j
+                               }
+                       }
+                       wg.Done()
+               }(i)
+       }
+       wg.Wait()
+       for k := 0; k < len(ressmap); k++ {
+               if ressmap[k].rerr != nil {
+                       return pkgerrors.Errorf("error during resources termination")
+               }
+       }
+       return nil
 
 }
 
-func  instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
-        var wg sync.WaitGroup
-        var chans = make([]chan int, len(ressmap))
-        for l := range chans {
-                chans[l] = make(chan int)
-        }
-        for i:=0; i<len(ressmap); i++ {
-                wg.Add(1)
-                go func(index int) {
-                        if ressmap[index].depinfo == "go" {
-                                ressmap[index].status = "start"
-                        } else {
-                                ressmap[index].status = "waiting"
-                                c := <- chans[index]
-                                if c != index {
+func instantiateResources(ac appcontext.AppContext, ressmap []instMap, appname string, clustername string) error {
+       var wg sync.WaitGroup
+       var chans = make([]chan int, len(ressmap))
+       cid, _ := ac.GetCompositeAppHandle()
+
+       results := strings.Split(cid.(string), "/")
+       label := results[2] + "-" + appname
+
+       for l := range chans {
+               chans[l] = make(chan int)
+       }
+       for i := 0; i < len(ressmap); i++ {
+               wg.Add(1)
+               go func(index int) {
+                       if ressmap[index].depinfo == "go" {
+                               ressmap[index].status = "start"
+                       } else {
+                               ressmap[index].status = "waiting"
+                               c := <-chans[index]
+                               if c != index {
                                        ressmap[index].status = "error"
-                                        ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
+                                       ressmap[index].rerr = pkgerrors.Errorf("channel does not match")
                                        wg.Done()
                                        return
-                                }
-                                ressmap[index].status = "start"
-                        }
-                        ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername)
-                        ressmap[index].status = "done"
-                        waitstr := fmt.Sprintf("wait on %v",ressmap[index].name)
-                        for j:=0; j<len(ressmap); j++ {
-                                if ressmap[j].depinfo == waitstr {
-                                        chans[j] <- j
-                                }
-                        }
-                        wg.Done()
-                }(i)
-        }
-        wg.Wait()
-        for k:=0; k<len(ressmap); k++ {
-                if ressmap[k].rerr != nil {
-                        return pkgerrors.Errorf("error during resources instantiation")
-                }
-        }
-        return nil
+                               }
+                               ressmap[index].status = "start"
+                       }
+                       ressmap[index].rerr = instantiateResource(ac, ressmap[index], appname, clustername, label)
+                       ressmap[index].status = "done"
+                       waitstr := fmt.Sprintf("wait on %v", ressmap[index].name)
+                       for j := 0; j < len(ressmap); j++ {
+                               if ressmap[j].depinfo == waitstr {
+                                       chans[j] <- j
+                               }
+                       }
+                       wg.Done()
+               }(i)
+       }
+       wg.Wait()
+       for k := 0; k < len(ressmap); k++ {
+               if ressmap[k].rerr != nil {
+                       return pkgerrors.Errorf("error during resources instantiation")
+               }
+       }
+       return nil
 
 }
 
 func terminateApp(ac appcontext.AppContext, appmap instMap) error {
 
-        for i:=0; i<len(appmap.clusters); i++ {
+       for i := 0; i < len(appmap.clusters); i++ {
                err := terminateResources(ac, appmap.clusters[i].ressmap, appmap.name,
-                               appmap.clusters[i].name)
+                       appmap.clusters[i].name)
                if err != nil {
                        return err
                }
@@ -281,38 +287,41 @@ func terminateApp(ac appcontext.AppContext, appmap instMap) error {
 
 }
 
-
 func instantiateApp(ac appcontext.AppContext, appmap instMap) error {
 
-        for i:=0; i<len(appmap.clusters); i++ {
+       for i := 0; i < len(appmap.clusters); i++ {
                err := instantiateResources(ac, appmap.clusters[i].ressmap, appmap.name,
-                               appmap.clusters[i].name)
+                       appmap.clusters[i].name)
                if err != nil {
                        return err
                }
+               err = status.StartClusterWatcher(appmap.clusters[i].name)
+               if err != nil {
+                       log.Printf("Error starting Cluster Watcher %v: %v\n", appmap.clusters[i], err)
+               }
        }
        log.Println("Instantiation of app done: " + appmap.name)
        return nil
 
 }
 
-func  instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
+func instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
        var wg sync.WaitGroup
        var chans = make([]chan int, len(appsmap))
        for l := range chans {
                chans[l] = make(chan int)
        }
-        for i:=0; i<len(appsmap); i++ {
+       for i := 0; i < len(appsmap); i++ {
                wg.Add(1)
-                go func(index int) {
-                        if appsmap[index].depinfo == "go" {
+               go func(index int) {
+                       if appsmap[index].depinfo == "go" {
                                appsmap[index].status = "start"
                        } else {
                                appsmap[index].status = "waiting"
-                               c := <- chans[index]
+                               c := <-chans[index]
                                if c != index {
                                        appsmap[index].status = "error"
-                                        appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
+                                       appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
                                        wg.Done()
                                        return
                                }
@@ -320,17 +329,17 @@ func  instantiateApps(ac appcontext.AppContext, appsmap []instMap) error {
                        }
                        appsmap[index].rerr = instantiateApp(ac, appsmap[index])
                        appsmap[index].status = "done"
-                       waitstr := fmt.Sprintf("wait on %v",appsmap[index].name)
-                       for j:=0; j<len(appsmap); j++ {
+                       waitstr := fmt.Sprintf("wait on %v", appsmap[index].name)
+                       for j := 0; j < len(appsmap); j++ {
                                if appsmap[j].depinfo == waitstr {
                                        chans[j] <- j
                                }
                        }
                        wg.Done()
-                }(i)
-        }
+               }(i)
+       }
        wg.Wait()
-       for k:=0; k<len(appsmap); k++ {
+       for k := 0; k < len(appsmap); k++ {
                if appsmap[k].rerr != nil {
                        return pkgerrors.Errorf("error during apps instantiation")
                }
@@ -343,45 +352,45 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
        ac := appcontext.AppContext{}
 
        _, err := ac.LoadAppContext(cid)
-        if err != nil {
-                return err
-        }
+       if err != nil {
+               return err
+       }
        instca.cid = cid
 
        appsorder, err := ac.GetAppInstruction("order")
-        if err != nil {
-                return err
-        }
+       if err != nil {
+               return err
+       }
        instca.appsorder = appsorder.(string)
        appsdependency, err := ac.GetAppInstruction("dependency")
-        if err != nil {
-                return err
-        }
+       if err != nil {
+               return err
+       }
        instca.appsdependency = appsdependency.(string)
-        instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app")
-        if err != nil {
-                return err
-        }
+       instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app")
+       if err != nil {
+               return err
+       }
 
-       for j:=0; j<len(instca.appsmap); j++ {
+       for j := 0; j < len(instca.appsmap); j++ {
                clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
                if err != nil {
-                       return err
+                       return err
                }
                instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
-               for k:=0; k<len(clusternames); k++ {
+               for k := 0; k < len(clusternames); k++ {
                        instca.appsmap[j].clusters[k].name = clusternames[k]
                        resorder, err := ac.GetResourceInstruction(
-                                       instca.appsmap[j].name, clusternames[k], "order")
+                               instca.appsmap[j].name, clusternames[k], "order")
                        if err != nil {
-                               return err
+                               return err
                        }
                        instca.appsmap[j].clusters[k].resorder = resorder.(string)
 
                        resdependency, err := ac.GetResourceInstruction(
-                                       instca.appsmap[j].name, clusternames[k], "dependency")
+                               instca.appsmap[j].name, clusternames[k], "dependency")
                        if err != nil {
-                               return err
+                               return err
                        }
                        instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
 
@@ -389,36 +398,36 @@ func (instca *CompositeAppContext) InstantiateComApp(cid interface{}) error {
                                instca.appsmap[j].clusters[k].resorder,
                                instca.appsmap[j].clusters[k].resdependency, "res")
                        if err != nil {
-                               return err
+                               return err
                        }
                }
        }
        err = instantiateApps(ac, instca.appsmap)
-        if err != nil {
-                return err
-        }
+       if err != nil {
+               return err
+       }
 
        return nil
 }
 
 // Delete all the apps
-func  terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
+func terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
        var wg sync.WaitGroup
        var chans = make([]chan int, len(appsmap))
        for l := range chans {
                chans[l] = make(chan int)
        }
-        for i:=0; i<len(appsmap); i++ {
+       for i := 0; i < len(appsmap); i++ {
                wg.Add(1)
-                go func(index int) {
-                        if appsmap[index].depinfo == "go" {
+               go func(index int) {
+                       if appsmap[index].depinfo == "go" {
                                appsmap[index].status = "start"
                        } else {
                                appsmap[index].status = "waiting"
-                               c := <- chans[index]
+                               c := <-chans[index]
                                if c != index {
                                        appsmap[index].status = "error"
-                                        appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
+                                       appsmap[index].rerr = pkgerrors.Errorf("channel does not match")
                                        wg.Done()
                                        return
                                }
@@ -426,17 +435,17 @@ func  terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
                        }
                        appsmap[index].rerr = terminateApp(ac, appsmap[index])
                        appsmap[index].status = "done"
-                       waitstr := fmt.Sprintf("wait on %v",appsmap[index].name)
-                       for j:=0; j<len(appsmap); j++ {
+                       waitstr := fmt.Sprintf("wait on %v", appsmap[index].name)
+                       for j := 0; j < len(appsmap); j++ {
                                if appsmap[j].depinfo == waitstr {
                                        chans[j] <- j
                                }
                        }
                        wg.Done()
-                }(i)
-        }
+               }(i)
+       }
        wg.Wait()
-       for k:=0; k<len(appsmap); k++ {
+       for k := 0; k < len(appsmap); k++ {
                if appsmap[k].rerr != nil {
                        return pkgerrors.Errorf("error during apps instantiation")
                }
@@ -444,50 +453,51 @@ func  terminateApps(ac appcontext.AppContext, appsmap []instMap) error {
        return nil
 
 }
+
 // Delete all the resources for a given context
 func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
        ac := appcontext.AppContext{}
 
        _, err := ac.LoadAppContext(cid)
-        if err != nil {
-                return err
-        }
+       if err != nil {
+               return err
+       }
        instca.cid = cid
 
        appsorder, err := ac.GetAppInstruction("order")
-        if err != nil {
-                return err
-        }
+       if err != nil {
+               return err
+       }
        instca.appsorder = appsorder.(string)
        appsdependency, err := ac.GetAppInstruction("dependency")
-        if err != nil {
-                return err
-        }
+       if err != nil {
+               return err
+       }
        instca.appsdependency = appsdependency.(string)
-        instca.appsmap, err = getInstMap(instca.appsorder,instca.appsdependency, "app")
-        if err != nil {
-                return err
-        }
+       instca.appsmap, err = getInstMap(instca.appsorder, instca.appsdependency, "app")
+       if err != nil {
+               return err
+       }
 
-       for j:=0; j<len(instca.appsmap); j++ {
+       for j := 0; j < len(instca.appsmap); j++ {
                clusternames, err := ac.GetClusterNames(instca.appsmap[j].name)
                if err != nil {
-                       return err
+                       return err
                }
                instca.appsmap[j].clusters = make([]clusterInfo, len(clusternames))
-               for k:=0; k<len(clusternames); k++ {
+               for k := 0; k < len(clusternames); k++ {
                        instca.appsmap[j].clusters[k].name = clusternames[k]
                        resorder, err := ac.GetResourceInstruction(
-                                       instca.appsmap[j].name, clusternames[k], "order")
+                               instca.appsmap[j].name, clusternames[k], "order")
                        if err != nil {
-                               return err
+                               return err
                        }
                        instca.appsmap[j].clusters[k].resorder = resorder.(string)
 
                        resdependency, err := ac.GetResourceInstruction(
-                                       instca.appsmap[j].name, clusternames[k], "dependency")
+                               instca.appsmap[j].name, clusternames[k], "dependency")
                        if err != nil {
-                               return err
+                               return err
                        }
                        instca.appsmap[j].clusters[k].resdependency = resdependency.(string)
 
@@ -495,14 +505,14 @@ func (instca *CompositeAppContext) TerminateComApp(cid interface{}) error {
                                instca.appsmap[j].clusters[k].resorder,
                                instca.appsmap[j].clusters[k].resdependency, "res")
                        if err != nil {
-                               return err
+                               return err
                        }
                }
        }
        err = terminateApps(ac, instca.appsmap)
-        if err != nil {
-                return err
-        }
+       if err != nil {
+               return err
+       }
 
        return nil
 
index 8b45c34..2877e2a 100644 (file)
@@ -20,16 +20,15 @@ import (
        "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
        "k8s.io/apimachinery/pkg/runtime/schema"
 
-       utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
-       "github.com/onap/multicloud-k8s/src/rsync/pkg/internal/config"
        "github.com/onap/multicloud-k8s/src/rsync/pkg/connector"
+       utils "github.com/onap/multicloud-k8s/src/rsync/pkg/internal"
 )
 
 type Resource struct {
 }
 
 // Create deployment object in a specific Kubernetes cluster
-func (r Resource) Create(data string, namespace string, client connector.KubernetesConnector) (string, error) {
+func (r Resource) Create(data string, namespace string, label string, client connector.KubernetesConnector) (string, error) {
        if namespace == "" {
                namespace = "default"
        }
@@ -57,13 +56,15 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne
        if labels == nil {
                labels = map[string]string{}
        }
-       labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+       //labels[config.GetConfiguration().KubernetesLabelName] = client.GetInstanceID()
+       labels["emco/deployment-id"] = label
        unstruct.SetLabels(labels)
 
        // This checks if the resource we are creating has a podSpec in it
        // Eg: Deployment, StatefulSet, Job etc..
        // If a PodSpec is found, the label will be added to it too.
-       connector.TagPodsIfPresent(unstruct, client.GetInstanceID())
+       //connector.TagPodsIfPresent(unstruct, client.GetInstanceID())
+       connector.TagPodsIfPresent(unstruct, label)
 
        gvr := mapping.Resource
        var createdObj *unstructured.Unstructured
@@ -86,44 +87,44 @@ func (r Resource) Create(data string, namespace string, client connector.Kuberne
 
 // Delete an existing resource hosted in a specific Kubernetes cluster
 func (r Resource) Delete(data string, resname string, namespace string, client connector.KubernetesConnector) error {
-        if namespace == "" {
-                namespace = "default"
-        }
-
-        //Decode the yaml file to create a runtime.Object
-        unstruct := &unstructured.Unstructured{}
-        //Ignore the returned obj as we expect the data in unstruct
-        _, err := utils.DecodeYAMLData(data, unstruct)
-        if err != nil {
-                return pkgerrors.Wrap(err, "Decode deployment object error")
-        }
-
-        dynClient := client.GetDynamicClient()
-        mapper := client.GetMapper()
-
-        gvk := unstruct.GroupVersionKind()
-        mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
-        if err != nil {
-                return pkgerrors.Wrap(err, "Mapping kind to resource error")
-        }
-
-        gvr := mapping.Resource
-        deletePolicy := metav1.DeletePropagationForeground
-        opts := &metav1.DeleteOptions{
-                PropagationPolicy: &deletePolicy,
-        }
-
-        switch mapping.Scope.Name() {
-        case meta.RESTScopeNameNamespace:
-                err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts)
-        case meta.RESTScopeNameRoot:
-                err = dynClient.Resource(gvr).Delete(resname, opts)
-        default:
-                return pkgerrors.New("Got an unknown RESTSCopeName for mappin")
-        }
-
-        if err != nil {
-                return pkgerrors.Wrap(err, "Delete object error")
-        }
-        return nil
+       if namespace == "" {
+               namespace = "default"
+       }
+
+       //Decode the yaml file to create a runtime.Object
+       unstruct := &unstructured.Unstructured{}
+       //Ignore the returned obj as we expect the data in unstruct
+       _, err := utils.DecodeYAMLData(data, unstruct)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Decode deployment object error")
+       }
+
+       dynClient := client.GetDynamicClient()
+       mapper := client.GetMapper()
+
+       gvk := unstruct.GroupVersionKind()
+       mapping, err := mapper.RESTMapping(schema.GroupKind{Group: gvk.Group, Kind: gvk.Kind}, gvk.Version)
+       if err != nil {
+               return pkgerrors.Wrap(err, "Mapping kind to resource error")
+       }
+
+       gvr := mapping.Resource
+       deletePolicy := metav1.DeletePropagationForeground
+       opts := &metav1.DeleteOptions{
+               PropagationPolicy: &deletePolicy,
+       }
+
+       switch mapping.Scope.Name() {
+       case meta.RESTScopeNameNamespace:
+               err = dynClient.Resource(gvr).Namespace(namespace).Delete(resname, opts)
+       case meta.RESTScopeNameRoot:
+               err = dynClient.Resource(gvr).Delete(resname, opts)
+       default:
+               return pkgerrors.New("Got an unknown RESTSCopeName for mappin")
+       }
+
+       if err != nil {
+               return pkgerrors.Wrap(err, "Delete object error")
+       }
+       return nil
 }
index 28bffef..351da02 100644 (file)
 package status
 
 import (
+       "encoding/base64"
        "encoding/json"
        "fmt"
+       "strings"
        "sync"
 
        pkgerrors "github.com/pkg/errors"
        "github.com/sirupsen/logrus"
 
+       "github.com/onap/multicloud-k8s/src/clm/pkg/cluster"
        v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
        clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned"
        informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions"
+       appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/tools/clientcmd"
 )
@@ -42,20 +46,75 @@ const monitorLabel = "emco/deployment-id"
 
 // HandleStatusUpdate for an application in a cluster
 // TODO: Add code for specific handling
-func HandleStatusUpdate(provider, name string, id string, v *v1alpha1.ResourceBundleState) error {
-       logrus.Info("label::", id)
+func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleState) {
        //status := v.Status.ServiceStatuses
        //podStatus := v.Status.PodStatuses
-       // Store Pod Status in app context
-       out, _ := json.Marshal(v.Status)
-       logrus.Info("Status::", string(out))
-       return nil
+
+       // Get the contextId from the label (id)
+       result := strings.SplitN(id, "-", 2)
+       if result[0] == "" {
+               logrus.Info(clusterId, "::label is missing an appcontext identifier::", id)
+               return
+       }
+
+       if len(result) != 2 {
+               logrus.Info(clusterId, "::invalid label format::", id)
+               return
+       }
+
+       // Get the app from the label (id)
+       if result[1] == "" {
+               logrus.Info(clusterId, "::label is missing an app identifier::", id)
+               return
+       }
+
+       // Look up the contextId
+       var ac appcontext.AppContext
+       _, err := ac.LoadAppContext(result[0])
+       if err != nil {
+               logrus.Info(clusterId, "::App context not found::", result[0], "::Error::", err)
+               return
+       }
+
+       // produce yaml representation of the status
+       vjson, err := json.Marshal(v.Status)
+       if err != nil {
+               logrus.Info(clusterId, "::Error marshalling status information::", err)
+               return
+       }
+
+       // Get the handle for the context/app/cluster status object
+       handle, err := ac.GetStatusHandle(result[1], clusterId)
+       if err != nil {
+               // Expected first time
+               logrus.Info(clusterId, "::Status context handle not found::", id, "::Error::", err)
+       }
+
+       // If status handle was not found, then create the status object in the appcontext
+       if handle == nil {
+               chandle, err := ac.GetClusterHandle(result[1], clusterId)
+               if err != nil {
+                       logrus.Info(clusterId, "::Cluster context handle not found::", id, "::Error::", err)
+               } else {
+                       ac.AddStatus(chandle, string(vjson))
+               }
+       } else {
+               ac.UpdateStatusValue(handle, string(vjson))
+       }
+
+       return
 }
 
 // StartClusterWatcher watches for CR
 // configBytes - Kubectl file data
-func StartClusterWatcher(provider, name string, configBytes []byte) error {
-       key := provider + "+" + name
+func StartClusterWatcher(clusterId string) error {
+
+       configBytes, err := getKubeConfig(clusterId)
+       if err != nil {
+               return err
+       }
+
+       //key := provider + "+" + name
        // Get the lock
        channelData.Lock()
        defer channelData.Unlock()
@@ -63,10 +122,10 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error {
        if channelData.channels == nil {
                channelData.channels = make(map[string]chan struct{})
        }
-       _, ok := channelData.channels[key]
+       _, ok := channelData.channels[clusterId]
        if !ok {
                // Create Channel
-               channelData.channels[key] = make(chan struct{})
+               channelData.channels[clusterId] = make(chan struct{})
                // Create config
                config, err := clientcmd.RESTConfigFromKubeConfig(configBytes)
                if err != nil {
@@ -80,16 +139,16 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error {
                // Create Informer
                mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
                mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer()
-               go scheduleStatus(provider, name, channelData.channels[key], mInformer)
+               go scheduleStatus(clusterId, channelData.channels[clusterId], mInformer)
        }
        return nil
 }
 
 // StopClusterWatcher stop watching a cluster
-func StopClusterWatcher(provider, name string) {
-       key := provider + "+" + name
+func StopClusterWatcher(clusterId string) {
+       //key := provider + "+" + name
        if channelData.channels != nil {
-               c, ok := channelData.channels[key]
+               c, ok := channelData.channels[clusterId]
                if ok {
                        close(c)
                }
@@ -108,7 +167,7 @@ func CloseAllClusterWatchers() {
 }
 
 // Per Cluster Go routine to watch CR
-func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedIndexInformer) {
+func scheduleStatus(clusterId string, c <-chan struct{}, s cache.SharedIndexInformer) {
        handlers := cache.ResourceEventHandlerFuncs{
                AddFunc: func(obj interface{}) {
                        v, ok := obj.(*v1alpha1.ResourceBundleState)
@@ -116,7 +175,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
                                labels := v.GetLabels()
                                l, ok := labels[monitorLabel]
                                if ok {
-                                       HandleStatusUpdate(provider, name, l, v)
+                                       HandleStatusUpdate(clusterId, l, v)
                                }
                        }
                },
@@ -126,7 +185,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
                                labels := v.GetLabels()
                                l, ok := labels[monitorLabel]
                                if ok {
-                                       HandleStatusUpdate(provider, name, l, v)
+                                       HandleStatusUpdate(clusterId, l, v)
                                }
                        }
                },
@@ -137,3 +196,27 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
        s.AddEventHandler(handlers)
        s.Run(c)
 }
+
+// getKubeConfig uses the connectivity client to get the kubeconfig based on the name
+// of the clustername. This is written out to a file.
+// TODO - consolidate with other rsync methods to get kubeconfig files
+func getKubeConfig(clustername string) ([]byte, error) {
+
+       if !strings.Contains(clustername, "+") {
+               return nil, pkgerrors.New("Not a valid cluster name")
+       }
+       strs := strings.Split(clustername, "+")
+       if len(strs) != 2 {
+               return nil, pkgerrors.New("Not a valid cluster name")
+       }
+       kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1])
+       if err != nil {
+               return nil, pkgerrors.New("Get kubeconfig failed")
+       }
+
+       dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig)
+       if err != nil {
+               return nil, err
+       }
+       return dec, nil
+}