Create OVN network action controller from ncm
[multicloud/k8s.git] / src / ncm / pkg / module / cluster.go
1 /*
2  * Copyright 2020 Intel Corporation, Inc
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package module
18
19 import (
20         "context"
21         "encoding/json"
22         "time"
23
24         "github.com/onap/multicloud-k8s/src/ncm/pkg/grpc"
25         appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
26         "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/db"
27         log "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/logutils"
28         "github.com/onap/multicloud-k8s/src/orchestrator/pkg/infra/rpc"
29         installpb "github.com/onap/multicloud-k8s/src/rsync/pkg/grpc/installapp"
30         "gopkg.in/yaml.v2"
31
32         pkgerrors "github.com/pkg/errors"
33 )
34
35 // ClusterProvider contains the parameters needed for ClusterProviders
36 type ClusterProvider struct {
37         Metadata Metadata `json:"metadata"`
38 }
39
40 type Cluster struct {
41         Metadata Metadata `json:"metadata"`
42 }
43
44 type ClusterContent struct {
45         Kubeconfig string `json:"kubeconfig"`
46 }
47
48 type ClusterLabel struct {
49         LabelName string `json:"label-name"`
50 }
51
52 type ClusterKvPairs struct {
53         Metadata Metadata      `json:"metadata"`
54         Spec     ClusterKvSpec `json:"spec"`
55 }
56
57 type ClusterKvSpec struct {
58         Kv []map[string]interface{} `json:"kv"`
59 }
60
61 // ClusterProviderKey is the key structure that is used in the database
62 type ClusterProviderKey struct {
63         ClusterProviderName string `json:"provider"`
64 }
65
66 // ClusterKey is the key structure that is used in the database
67 type ClusterKey struct {
68         ClusterProviderName string `json:"provider"`
69         ClusterName         string `json:"cluster"`
70 }
71
72 // ClusterLabelKey is the key structure that is used in the database
73 type ClusterLabelKey struct {
74         ClusterProviderName string `json:"provider"`
75         ClusterName         string `json:"cluster"`
76         ClusterLabelName    string `json:"label"`
77 }
78
79 // LabelKey is the key structure that is used in the database
80 type LabelKey struct {
81         ClusterProviderName string `json:"provider"`
82         ClusterLabelName    string `json:"label"`
83 }
84
85 // ClusterKvPairsKey is the key structure that is used in the database
86 type ClusterKvPairsKey struct {
87         ClusterProviderName string `json:"provider"`
88         ClusterName         string `json:"cluster"`
89         ClusterKvPairsName  string `json:"kvname"`
90 }
91
92 const SEPARATOR = "+"
93 const CONTEXT_CLUSTER_APP = "network-intents"
94 const CONTEXT_CLUSTER_RESOURCE = "network-intents"
95
96 // ClusterManager is an interface exposes the Cluster functionality
97 type ClusterManager interface {
98         CreateClusterProvider(pr ClusterProvider) (ClusterProvider, error)
99         GetClusterProvider(name string) (ClusterProvider, error)
100         GetClusterProviders() ([]ClusterProvider, error)
101         DeleteClusterProvider(name string) error
102         CreateCluster(provider string, pr Cluster, qr ClusterContent) (Cluster, error)
103         GetCluster(provider, name string) (Cluster, error)
104         GetClusterContent(provider, name string) (ClusterContent, error)
105         GetClusterContext(provider, name string) (appcontext.AppContext, error)
106         GetClusters(provider string) ([]Cluster, error)
107         GetClustersWithLabel(provider, label string) ([]string, error)
108         DeleteCluster(provider, name string) error
109         ApplyNetworkIntents(provider, name string) error
110         TerminateNetworkIntents(provider, name string) error
111         CreateClusterLabel(provider, cluster string, pr ClusterLabel) (ClusterLabel, error)
112         GetClusterLabel(provider, cluster, label string) (ClusterLabel, error)
113         GetClusterLabels(provider, cluster string) ([]ClusterLabel, error)
114         DeleteClusterLabel(provider, cluster, label string) error
115         CreateClusterKvPairs(provider, cluster string, pr ClusterKvPairs) (ClusterKvPairs, error)
116         GetClusterKvPairs(provider, cluster, kvpair string) (ClusterKvPairs, error)
117         GetAllClusterKvPairs(provider, cluster string) ([]ClusterKvPairs, error)
118         DeleteClusterKvPairs(provider, cluster, kvpair string) error
119 }
120
121 // ClusterClient implements the Manager
122 // It will also be used to maintain some localized state
123 type ClusterClient struct {
124         db ClientDbInfo
125 }
126
127 // NewClusterClient returns an instance of the ClusterClient
128 // which implements the Manager
129 func NewClusterClient() *ClusterClient {
130         return &ClusterClient{
131                 db: ClientDbInfo{
132                         storeName:  "cluster",
133                         tagMeta:    "clustermetadata",
134                         tagContent: "clustercontent",
135                         tagContext: "clustercontext",
136                 },
137         }
138 }
139
140 // CreateClusterProvider - create a new Cluster Provider
141 func (v *ClusterClient) CreateClusterProvider(p ClusterProvider) (ClusterProvider, error) {
142
143         //Construct key and tag to select the entry
144         key := ClusterProviderKey{
145                 ClusterProviderName: p.Metadata.Name,
146         }
147
148         //Check if this ClusterProvider already exists
149         _, err := v.GetClusterProvider(p.Metadata.Name)
150         if err == nil {
151                 return ClusterProvider{}, pkgerrors.New("ClusterProvider already exists")
152         }
153
154         err = db.DBconn.Insert(v.db.storeName, key, nil, v.db.tagMeta, p)
155         if err != nil {
156                 return ClusterProvider{}, pkgerrors.Wrap(err, "Creating DB Entry")
157         }
158
159         return p, nil
160 }
161
162 // GetClusterProvider returns the ClusterProvider for corresponding name
163 func (v *ClusterClient) GetClusterProvider(name string) (ClusterProvider, error) {
164
165         //Construct key and tag to select the entry
166         key := ClusterProviderKey{
167                 ClusterProviderName: name,
168         }
169
170         value, err := db.DBconn.Find(v.db.storeName, key, v.db.tagMeta)
171         if err != nil {
172                 return ClusterProvider{}, pkgerrors.Wrap(err, "Get ClusterProvider")
173         }
174
175         //value is a byte array
176         if value != nil {
177                 cp := ClusterProvider{}
178                 err = db.DBconn.Unmarshal(value[0], &cp)
179                 if err != nil {
180                         return ClusterProvider{}, pkgerrors.Wrap(err, "Unmarshalling Value")
181                 }
182                 return cp, nil
183         }
184
185         return ClusterProvider{}, pkgerrors.New("Error getting ClusterProvider")
186 }
187
188 // GetClusterProviderList returns all of the ClusterProvider for corresponding name
189 func (v *ClusterClient) GetClusterProviders() ([]ClusterProvider, error) {
190
191         //Construct key and tag to select the entry
192         key := ClusterProviderKey{
193                 ClusterProviderName: "",
194         }
195
196         var resp []ClusterProvider
197         values, err := db.DBconn.Find(v.db.storeName, key, v.db.tagMeta)
198         if err != nil {
199                 return []ClusterProvider{}, pkgerrors.Wrap(err, "Get ClusterProviders")
200         }
201
202         for _, value := range values {
203                 cp := ClusterProvider{}
204                 err = db.DBconn.Unmarshal(value, &cp)
205                 if err != nil {
206                         return []ClusterProvider{}, pkgerrors.Wrap(err, "Unmarshalling Value")
207                 }
208                 resp = append(resp, cp)
209         }
210
211         return resp, nil
212 }
213
214 // DeleteClusterProvider the  ClusterProvider from database
215 func (v *ClusterClient) DeleteClusterProvider(name string) error {
216
217         //Construct key and tag to select the entry
218         key := ClusterProviderKey{
219                 ClusterProviderName: name,
220         }
221
222         err := db.DBconn.Remove(v.db.storeName, key)
223         if err != nil {
224                 return pkgerrors.Wrap(err, "Delete ClusterProvider Entry;")
225         }
226
227         return nil
228 }
229
230 // CreateCluster - create a new Cluster for a cluster-provider
231 func (v *ClusterClient) CreateCluster(provider string, p Cluster, q ClusterContent) (Cluster, error) {
232
233         //Construct key and tag to select the entry
234         key := ClusterKey{
235                 ClusterProviderName: provider,
236                 ClusterName:         p.Metadata.Name,
237         }
238
239         //Verify ClusterProvider already exists
240         _, err := v.GetClusterProvider(provider)
241         if err != nil {
242                 return Cluster{}, pkgerrors.New("ClusterProvider does not exist")
243         }
244
245         //Check if this Cluster already exists
246         _, err = v.GetCluster(provider, p.Metadata.Name)
247         if err == nil {
248                 return Cluster{}, pkgerrors.New("Cluster already exists")
249         }
250
251         err = db.DBconn.Insert(v.db.storeName, key, nil, v.db.tagMeta, p)
252         if err != nil {
253                 return Cluster{}, pkgerrors.Wrap(err, "Creating DB Entry")
254         }
255         err = db.DBconn.Insert(v.db.storeName, key, nil, v.db.tagContent, q)
256         if err != nil {
257                 return Cluster{}, pkgerrors.Wrap(err, "Creating DB Entry")
258         }
259
260         return p, nil
261 }
262
263 // GetCluster returns the Cluster for corresponding provider and name
264 func (v *ClusterClient) GetCluster(provider, name string) (Cluster, error) {
265         //Construct key and tag to select the entry
266         key := ClusterKey{
267                 ClusterProviderName: provider,
268                 ClusterName:         name,
269         }
270
271         value, err := db.DBconn.Find(v.db.storeName, key, v.db.tagMeta)
272         if err != nil {
273                 return Cluster{}, pkgerrors.Wrap(err, "Get Cluster")
274         }
275
276         //value is a byte array
277         if value != nil {
278                 cl := Cluster{}
279                 err = db.DBconn.Unmarshal(value[0], &cl)
280                 if err != nil {
281                         return Cluster{}, pkgerrors.Wrap(err, "Unmarshalling Value")
282                 }
283                 return cl, nil
284         }
285
286         return Cluster{}, pkgerrors.New("Error getting Cluster")
287 }
288
289 // GetClusterContent returns the ClusterContent for corresponding provider and name
290 func (v *ClusterClient) GetClusterContent(provider, name string) (ClusterContent, error) {
291         //Construct key and tag to select the entry
292         key := ClusterKey{
293                 ClusterProviderName: provider,
294                 ClusterName:         name,
295         }
296
297         value, err := db.DBconn.Find(v.db.storeName, key, v.db.tagContent)
298         if err != nil {
299                 return ClusterContent{}, pkgerrors.Wrap(err, "Get Cluster Content")
300         }
301
302         //value is a byte array
303         if value != nil {
304                 cc := ClusterContent{}
305                 err = db.DBconn.Unmarshal(value[0], &cc)
306                 if err != nil {
307                         return ClusterContent{}, pkgerrors.Wrap(err, "Unmarshalling Value")
308                 }
309                 return cc, nil
310         }
311
312         return ClusterContent{}, pkgerrors.New("Error getting Cluster Content")
313 }
314
315 // GetClusterContext returns the AppContext for corresponding provider and name
316 func (v *ClusterClient) GetClusterContext(provider, name string) (appcontext.AppContext, error) {
317         //Construct key and tag to select the entry
318         key := ClusterKey{
319                 ClusterProviderName: provider,
320                 ClusterName:         name,
321         }
322
323         value, err := db.DBconn.Find(v.db.storeName, key, v.db.tagContext)
324         if err != nil {
325                 return appcontext.AppContext{}, pkgerrors.Wrap(err, "Get Cluster Context")
326         }
327
328         //value is a byte array
329         if value != nil {
330                 ctxVal := string(value[0])
331                 var cc appcontext.AppContext
332                 _, err = cc.LoadAppContext(ctxVal)
333                 if err != nil {
334                         return appcontext.AppContext{}, pkgerrors.Wrap(err, "Reinitializing Cluster AppContext")
335                 }
336                 return cc, nil
337         }
338
339         return appcontext.AppContext{}, pkgerrors.New("Error getting Cluster AppContext")
340 }
341
342 // GetClusters returns all the Clusters for corresponding provider
343 func (v *ClusterClient) GetClusters(provider string) ([]Cluster, error) {
344         //Construct key and tag to select the entry
345         key := ClusterKey{
346                 ClusterProviderName: provider,
347                 ClusterName:         "",
348         }
349
350         values, err := db.DBconn.Find(v.db.storeName, key, v.db.tagMeta)
351         if err != nil {
352                 return []Cluster{}, pkgerrors.Wrap(err, "Get Clusters")
353         }
354
355         var resp []Cluster
356
357         for _, value := range values {
358                 cp := Cluster{}
359                 err = db.DBconn.Unmarshal(value, &cp)
360                 if err != nil {
361                         return []Cluster{}, pkgerrors.Wrap(err, "Unmarshalling Value")
362                 }
363                 resp = append(resp, cp)
364         }
365
366         return resp, nil
367 }
368
369 // GetClustersWithLabel returns all the Clusters with Labels for provider
370 // Support Query like /cluster-providers/{Provider}/clusters?label={label}
371 func (v *ClusterClient) GetClustersWithLabel(provider, label string) ([]string, error) {
372         //Construct key and tag to select the entry
373         key := LabelKey{
374                 ClusterProviderName: provider,
375                 ClusterLabelName:    label,
376         }
377
378         values, err := db.DBconn.Find(v.db.storeName, key, "cluster")
379         if err != nil {
380                 return []string{}, pkgerrors.Wrap(err, "Get Clusters by label")
381         }
382         var resp []string
383
384         for _, value := range values {
385                 cp := string(value)
386                 resp = append(resp, cp)
387         }
388
389         return resp, nil
390 }
391
392 // DeleteCluster the  Cluster from database
393 func (v *ClusterClient) DeleteCluster(provider, name string) error {
394         //Construct key and tag to select the entry
395         key := ClusterKey{
396                 ClusterProviderName: provider,
397                 ClusterName:         name,
398         }
399         _, err := v.GetClusterContext(provider, name)
400         if err == nil {
401                 return pkgerrors.Errorf("Cannot delete cluster until context is deleted: %v, %v", provider, name)
402         }
403
404         err = db.DBconn.Remove(v.db.storeName, key)
405         if err != nil {
406                 return pkgerrors.Wrap(err, "Delete Cluster Entry;")
407         }
408
409         return nil
410 }
411
412 // Apply Network Intents associated with a cluster
413 func (v *ClusterClient) ApplyNetworkIntents(provider, name string) error {
414
415         _, err := v.GetClusterContext(provider, name)
416         if err == nil {
417                 return pkgerrors.Errorf("Cluster network intents have already been applied: %v, %v", provider, name)
418         }
419
420         type resource struct {
421                 name  string
422                 value string
423         }
424
425         var resources []resource
426
427         // Find all Network Intents for this cluster
428         networkIntents, err := NewNetworkClient().GetNetworks(provider, name)
429         if err != nil {
430                 return pkgerrors.Wrap(err, "Error finding Network Intents")
431         }
432         for _, intent := range networkIntents {
433                 var crNetwork = CrNetwork{
434                         ApiVersion: NETWORK_APIVERSION,
435                         Kind:       NETWORK_KIND,
436                 }
437                 crNetwork.Network = intent
438                 // Produce the yaml CR document for each intent
439                 y, err := yaml.Marshal(&crNetwork)
440                 if err != nil {
441                         log.Info("Error marshalling network intent to yaml", log.Fields{
442                                 "error":  err,
443                                 "intent": intent,
444                         })
445                         continue
446                 }
447                 resources = append(resources, resource{
448                         name:  intent.Metadata.Name + SEPARATOR + NETWORK_KIND,
449                         value: string(y),
450                 })
451         }
452
453         // Find all Provider Network Intents for this cluster
454         providerNetworkIntents, err := NewProviderNetClient().GetProviderNets(provider, name)
455         if err != nil {
456                 return pkgerrors.Wrap(err, "Error finding Provider Network Intents")
457         }
458         for _, intent := range providerNetworkIntents {
459                 var crProviderNet = CrProviderNet{
460                         ApiVersion: PROVIDER_NETWORK_APIVERSION,
461                         Kind:       PROVIDER_NETWORK_KIND,
462                 }
463                 crProviderNet.ProviderNet = intent
464                 // Produce the yaml CR document for each intent
465                 y, err := yaml.Marshal(&crProviderNet)
466                 if err != nil {
467                         log.Info("Error marshalling provider network intent to yaml", log.Fields{
468                                 "error":  err,
469                                 "intent": intent,
470                         })
471                         continue
472                 }
473                 resources = append(resources, resource{
474                         name:  intent.Metadata.Name + SEPARATOR + PROVIDER_NETWORK_KIND,
475                         value: string(y),
476                 })
477         }
478
479         if len(resources) == 0 {
480                 return nil
481         }
482
483         // Make an app context for the network intent resources
484         ac := appcontext.AppContext{}
485         ctxVal, err := ac.InitAppContext()
486         if err != nil {
487                 return pkgerrors.Wrap(err, "Error creating AppContext")
488         }
489         handle, err := ac.CreateCompositeApp()
490         if err != nil {
491                 return pkgerrors.Wrap(err, "Error creating AppContext CompositeApp")
492         }
493
494         // Add an app (fixed value) to the app context
495         apphandle, err := ac.AddApp(handle, CONTEXT_CLUSTER_APP)
496         if err != nil {
497                 cleanuperr := ac.DeleteCompositeApp()
498                 if cleanuperr != nil {
499                         log.Warn("Error cleaning AppContext CompositeApp create failure", log.Fields{
500                                 "cluster-provider": provider,
501                                 "cluster":          name,
502                         })
503                 }
504                 return pkgerrors.Wrap(err, "Error adding App to AppContext")
505         }
506
507         // Add an app order instruction
508         appinstr := struct {
509                 Apporder []string `json:"apporder"`
510         }{
511                 []string{CONTEXT_CLUSTER_APP},
512         }
513         jinstr, _ := json.Marshal(appinstr)
514
515         appdepinstr := struct {
516                 Appdep map[string]string `json:"appdependency"`
517         }{
518                 map[string]string{CONTEXT_CLUSTER_APP: "go"},
519         }
520         jdep, _ := json.Marshal(appdepinstr)
521
522         _, err = ac.AddInstruction(handle, "app", "order", string(jinstr))
523         _, err = ac.AddInstruction(handle, "app", "dependency", string(jdep))
524
525         // Add a cluster to the app
526         clusterhandle, err := ac.AddCluster(apphandle, provider+SEPARATOR+name)
527         if err != nil {
528                 cleanuperr := ac.DeleteCompositeApp()
529                 if cleanuperr != nil {
530                         log.Warn("Error cleaning AppContext after add cluster failure", log.Fields{
531                                 "cluster-provider": provider,
532                                 "cluster":          name,
533                         })
534                 }
535                 return pkgerrors.Wrap(err, "Error adding Cluster to AppContext")
536         }
537
538         // add the resources to the app context
539
540         var orderinstr struct {
541                 Resorder []string `json:"resorder"`
542         }
543         var depinstr struct {
544                 Resdep map[string]string `json:"resdependency"`
545         }
546         resdep := make(map[string]string)
547         for _, resource := range resources {
548                 orderinstr.Resorder = append(orderinstr.Resorder, resource.name)
549                 resdep[resource.name] = "go"
550                 _, err = ac.AddResource(clusterhandle, resource.name, resource.value)
551                 if err != nil {
552                         cleanuperr := ac.DeleteCompositeApp()
553                         if cleanuperr != nil {
554                                 log.Warn("Error cleaning AppContext after add resource failure", log.Fields{
555                                         "cluster-provider": provider,
556                                         "cluster":          name,
557                                         "resource":         resource.name,
558                                 })
559                         }
560                         return pkgerrors.Wrap(err, "Error adding Resource to AppContext")
561                 }
562         }
563         jresord, _ := json.Marshal(orderinstr)
564         depinstr.Resdep = resdep
565         jresdep, _ := json.Marshal(depinstr)
566         _, err = ac.AddInstruction(clusterhandle, "resource", "order", string(jresord))
567         _, err = ac.AddInstruction(clusterhandle, "resource", "dependency", string(jresdep))
568
569         // save the context in the cluster db record
570         key := ClusterKey{
571                 ClusterProviderName: provider,
572                 ClusterName:         name,
573         }
574         err = db.DBconn.Insert(v.db.storeName, key, nil, v.db.tagContext, ctxVal)
575         if err != nil {
576                 cleanuperr := ac.DeleteCompositeApp()
577                 if cleanuperr != nil {
578                         log.Warn("Error cleaning AppContext after DB insert failure", log.Fields{
579                                 "cluster-provider": provider,
580                                 "cluster":          name,
581                         })
582                 }
583                 return pkgerrors.Wrap(err, "Error adding AppContext to DB")
584         }
585
586         // call resource synchronizer to instantiate the CRs in the cluster
587         conn := rpc.GetRpcConn(grpc.RsyncName)
588         if conn == nil {
589                 grpc.InitRsyncClient()
590                 conn = rpc.GetRpcConn(grpc.RsyncName)
591         }
592
593         var rpcClient installpb.InstallappClient
594         var installRes *installpb.InstallAppResponse
595         ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
596         defer cancel()
597
598         if conn != nil {
599                 rpcClient = installpb.NewInstallappClient(conn)
600                 installReq := new(installpb.InstallAppRequest)
601                 installReq.AppContext = ctxVal.(string)
602                 installRes, err = rpcClient.InstallApp(ctx, installReq)
603                 if err == nil {
604                         log.Info("Response from InstappApp GRPC call", log.Fields{
605                                 "Succeeded": installRes.AppContextInstalled,
606                                 "Message":   installRes.AppContextInstallMessage,
607                         })
608                 }
609         } else {
610                 return pkgerrors.Errorf("InstallApp Failed - Could not get InstallAppClient: %v", grpc.RsyncName)
611         }
612
613         return nil
614 }
615
616 // Terminate Network Intents associated with a cluster
617 func (v *ClusterClient) TerminateNetworkIntents(provider, name string) error {
618         context, err := v.GetClusterContext(provider, name)
619         if err != nil {
620                 return pkgerrors.Wrapf(err, "Error finding AppContext for cluster: %v, %v", provider, name)
621         }
622
623         // TODO: call resource synchronizer to terminate the CRs in the cluster
624
625         // remove the app context
626         cleanuperr := context.DeleteCompositeApp()
627         if cleanuperr != nil {
628                 log.Warn("Error deleted AppContext", log.Fields{
629                         "cluster-provider": provider,
630                         "cluster":          name,
631                 })
632         }
633
634         // remove the app context field from the cluster db record
635         key := ClusterKey{
636                 ClusterProviderName: provider,
637                 ClusterName:         name,
638         }
639         err = db.DBconn.RemoveTag(v.db.storeName, key, v.db.tagContext)
640         if err != nil {
641                 log.Warn("Error removing AppContext from Cluster document", log.Fields{
642                         "cluster-provider": provider,
643                         "cluster":          name,
644                 })
645         }
646         return nil
647 }
648
649 // CreateClusterLabel - create a new Cluster Label mongo document for a cluster-provider/cluster
650 func (v *ClusterClient) CreateClusterLabel(provider string, cluster string, p ClusterLabel) (ClusterLabel, error) {
651         //Construct key and tag to select the entry
652         key := ClusterLabelKey{
653                 ClusterProviderName: provider,
654                 ClusterName:         cluster,
655                 ClusterLabelName:    p.LabelName,
656         }
657
658         //Verify Cluster already exists
659         _, err := v.GetCluster(provider, cluster)
660         if err != nil {
661                 return ClusterLabel{}, pkgerrors.New("Cluster does not exist")
662         }
663
664         //Check if this ClusterLabel already exists
665         _, err = v.GetClusterLabel(provider, cluster, p.LabelName)
666         if err == nil {
667                 return ClusterLabel{}, pkgerrors.New("Cluster Label already exists")
668         }
669
670         err = db.DBconn.Insert(v.db.storeName, key, nil, v.db.tagMeta, p)
671         if err != nil {
672                 return ClusterLabel{}, pkgerrors.Wrap(err, "Creating DB Entry")
673         }
674
675         return p, nil
676 }
677
678 // GetClusterLabel returns the Cluster for corresponding provider, cluster and label
679 func (v *ClusterClient) GetClusterLabel(provider, cluster, label string) (ClusterLabel, error) {
680         //Construct key and tag to select the entry
681         key := ClusterLabelKey{
682                 ClusterProviderName: provider,
683                 ClusterName:         cluster,
684                 ClusterLabelName:    label,
685         }
686
687         value, err := db.DBconn.Find(v.db.storeName, key, v.db.tagMeta)
688         if err != nil {
689                 return ClusterLabel{}, pkgerrors.Wrap(err, "Get Cluster")
690         }
691
692         //value is a byte array
693         if value != nil {
694                 cl := ClusterLabel{}
695                 err = db.DBconn.Unmarshal(value[0], &cl)
696                 if err != nil {
697                         return ClusterLabel{}, pkgerrors.Wrap(err, "Unmarshalling Value")
698                 }
699                 return cl, nil
700         }
701
702         return ClusterLabel{}, pkgerrors.New("Error getting Cluster")
703 }
704
705 // GetClusterLabels returns the Cluster Labels for corresponding provider and cluster
706 func (v *ClusterClient) GetClusterLabels(provider, cluster string) ([]ClusterLabel, error) {
707         //Construct key and tag to select the entry
708         key := ClusterLabelKey{
709                 ClusterProviderName: provider,
710                 ClusterName:         cluster,
711                 ClusterLabelName:    "",
712         }
713
714         values, err := db.DBconn.Find(v.db.storeName, key, v.db.tagMeta)
715         if err != nil {
716                 return []ClusterLabel{}, pkgerrors.Wrap(err, "Get Cluster Labels")
717         }
718
719         var resp []ClusterLabel
720
721         for _, value := range values {
722                 cp := ClusterLabel{}
723                 err = db.DBconn.Unmarshal(value, &cp)
724                 if err != nil {
725                         return []ClusterLabel{}, pkgerrors.Wrap(err, "Unmarshalling Value")
726                 }
727                 resp = append(resp, cp)
728         }
729
730         return resp, nil
731 }
732
733 // Delete the Cluster Label from database
734 func (v *ClusterClient) DeleteClusterLabel(provider, cluster, label string) error {
735         //Construct key and tag to select the entry
736         key := ClusterLabelKey{
737                 ClusterProviderName: provider,
738                 ClusterName:         cluster,
739                 ClusterLabelName:    label,
740         }
741
742         err := db.DBconn.Remove(v.db.storeName, key)
743         if err != nil {
744                 return pkgerrors.Wrap(err, "Delete ClusterLabel Entry;")
745         }
746
747         return nil
748 }
749
750 // CreateClusterKvPairs - Create a New Cluster KV pairs document
751 func (v *ClusterClient) CreateClusterKvPairs(provider string, cluster string, p ClusterKvPairs) (ClusterKvPairs, error) {
752         key := ClusterKvPairsKey{
753                 ClusterProviderName: provider,
754                 ClusterName:         cluster,
755                 ClusterKvPairsName:  p.Metadata.Name,
756         }
757
758         //Verify Cluster already exists
759         _, err := v.GetCluster(provider, cluster)
760         if err != nil {
761                 return ClusterKvPairs{}, pkgerrors.New("Cluster does not exist")
762         }
763
764         //Check if this ClusterKvPairs already exists
765         _, err = v.GetClusterKvPairs(provider, cluster, p.Metadata.Name)
766         if err == nil {
767                 return ClusterKvPairs{}, pkgerrors.New("Cluster KV Pair already exists")
768         }
769
770         err = db.DBconn.Insert(v.db.storeName, key, nil, v.db.tagMeta, p)
771         if err != nil {
772                 return ClusterKvPairs{}, pkgerrors.Wrap(err, "Creating DB Entry")
773         }
774
775         return p, nil
776 }
777
778 // GetClusterKvPairs returns the Cluster KeyValue pair for corresponding provider, cluster and KV pair name
779 func (v *ClusterClient) GetClusterKvPairs(provider, cluster, kvpair string) (ClusterKvPairs, error) {
780         //Construct key and tag to select entry
781         key := ClusterKvPairsKey{
782                 ClusterProviderName: provider,
783                 ClusterName:         cluster,
784                 ClusterKvPairsName:  kvpair,
785         }
786
787         value, err := db.DBconn.Find(v.db.storeName, key, v.db.tagMeta)
788         if err != nil {
789                 return ClusterKvPairs{}, pkgerrors.Wrap(err, "Get Cluster")
790         }
791
792         //value is a byte array
793         if value != nil {
794                 ckvp := ClusterKvPairs{}
795                 err = db.DBconn.Unmarshal(value[0], &ckvp)
796                 if err != nil {
797                         return ClusterKvPairs{}, pkgerrors.Wrap(err, "Unmarshalling Value")
798                 }
799                 return ckvp, nil
800         }
801
802         return ClusterKvPairs{}, pkgerrors.New("Error getting Cluster")
803 }
804
805 // GetAllClusterKvPairs returns the Cluster Kv Pairs for corresponding provider and cluster
806 func (v *ClusterClient) GetAllClusterKvPairs(provider, cluster string) ([]ClusterKvPairs, error) {
807         //Construct key and tag to select the entry
808         key := ClusterKvPairsKey{
809                 ClusterProviderName: provider,
810                 ClusterName:         cluster,
811                 ClusterKvPairsName:  "",
812         }
813
814         values, err := db.DBconn.Find(v.db.storeName, key, v.db.tagMeta)
815         if err != nil {
816                 return []ClusterKvPairs{}, pkgerrors.Wrap(err, "Get Cluster KV Pairs")
817         }
818
819         var resp []ClusterKvPairs
820
821         for _, value := range values {
822                 cp := ClusterKvPairs{}
823                 err = db.DBconn.Unmarshal(value, &cp)
824                 if err != nil {
825                         return []ClusterKvPairs{}, pkgerrors.Wrap(err, "Unmarshalling Value")
826                 }
827                 resp = append(resp, cp)
828         }
829
830         return resp, nil
831 }
832
833 // DeleteClusterKvPairs the  ClusterKvPairs from database
834 func (v *ClusterClient) DeleteClusterKvPairs(provider, cluster, kvpair string) error {
835         //Construct key and tag to select entry
836         key := ClusterKvPairsKey{
837                 ClusterProviderName: provider,
838                 ClusterName:         cluster,
839                 ClusterKvPairsName:  kvpair,
840         }
841
842         err := db.DBconn.Remove(v.db.storeName, key)
843         if err != nil {
844                 return pkgerrors.Wrap(err, "Delete ClusterKvPairs Entry;")
845         }
846
847         return nil
848 }