initial codebase for kube2msb 81/8881/2
authorHuabingZhao <zhao.huabing@zte.com.cn>
Mon, 28 Aug 2017 02:53:35 +0000 (10:53 +0800)
committerHuabingZhao <zhao.huabing@zte.com.cn>
Mon, 28 Aug 2017 03:10:16 +0000 (11:10 +0800)
Issue-Id: OOM-61
Change-Id: Ibf70557f1e9277bbe07d8e0e91bf6b125cecb144
Signed-off-by: HuabingZhao <zhao.huabing@zte.com.cn>
kube2msb/src/kube2msb.go [new file with mode: 0644]
kube2msb/src/kube_work.go [new file with mode: 0644]
kube2msb/src/msb_client.go [new file with mode: 0644]
kube2msb/src/msb_work.go [new file with mode: 0644]
kube2msb/src/types.go [new file with mode: 0644]

diff --git a/kube2msb/src/kube2msb.go b/kube2msb/src/kube2msb.go
new file mode 100644 (file)
index 0000000..627405e
--- /dev/null
@@ -0,0 +1,288 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+       "flag"
+       "fmt"
+       "log"
+       "net/url"
+       "os"
+       "reflect"
+       "time"
+
+       kapi "k8s.io/kubernetes/pkg/api"
+       kcache "k8s.io/kubernetes/pkg/client/cache"
+       kclient "k8s.io/kubernetes/pkg/client/unversioned"
+       kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
+       kframework "k8s.io/kubernetes/pkg/controller/framework"
+       kselector "k8s.io/kubernetes/pkg/fields"
+       klabels "k8s.io/kubernetes/pkg/labels"
+)
+
+var (
+       argMSBUrl        = flag.String("msb_url", "", "URL to MSB backend")
+       argKubeMasterUrl = flag.String("kube_master_url", "", "Url to reach kubernetes master. Env variables in this flag will be expanded.")
+       addMap           = make(map[string]*kapi.Pod)
+       deleteMap        = make(map[string]*kapi.Pod)
+       nodeSelector     = klabels.Everything()
+)
+
+const resyncPeriod = 5 * time.Second
+
+func getMSBUrl() (string, error) {
+       if *argMSBUrl == "" {
+               return "", fmt.Errorf("no --msb_url specified")
+       }
+       parsedUrl, err := url.Parse(os.ExpandEnv(*argMSBUrl))
+       if err != nil {
+               return "", fmt.Errorf("failed to parse --msb_url %s - %v", *argMSBUrl, err)
+       }
+       if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" {
+               return "", fmt.Errorf("invalid --msb_url specified %s", *argMSBUrl)
+       }
+       return parsedUrl.String(), nil
+}
+
+func newMSBClient() (Client, error) {
+       msbUrl, err := getMSBUrl()
+       if err != nil {
+               return nil, err
+       }
+
+       client, err := newMSBAgent(msbUrl)
+       if err != nil {
+               return nil, err
+       }
+       return client, nil
+}
+
+func getKubeMasterUrl() (string, error) {
+       if *argKubeMasterUrl == "" {
+               return "", fmt.Errorf("no --kube_master_url specified")
+       }
+       parsedUrl, err := url.Parse(os.ExpandEnv(*argKubeMasterUrl))
+       if err != nil {
+               return "", fmt.Errorf("failed to parse --kube_master_url %s - %v", *argKubeMasterUrl, err)
+       }
+       if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" {
+               return "", fmt.Errorf("invalid --kube_master_url specified %s", *argKubeMasterUrl)
+       }
+       return parsedUrl.String(), nil
+}
+
+func newKubeClient() (*kclient.Client, error) {
+       masterUrl, err := getKubeMasterUrl()
+       if err != nil {
+               return nil, err
+       }
+       overrides := &kclientcmd.ConfigOverrides{}
+       overrides.ClusterInfo.Server = masterUrl
+
+       rules := kclientcmd.NewDefaultClientConfigLoadingRules()
+       kubeConfig, err := kclientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides).ClientConfig()
+
+       if err != nil {
+               log.Println("Error creating Kube Config", err)
+               return nil, err
+       }
+       kubeConfig.Host = masterUrl
+
+       log.Printf("Using %s for kubernetes master", kubeConfig.Host)
+       return kclient.New(kubeConfig)
+}
+
+// Returns a cache.ListWatch that gets all changes to services.
+func createServiceLW(kubeClient *kclient.Client) *kcache.ListWatch {
+       return kcache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kselector.Everything())
+}
+
+func sendServiceWork(action KubeWorkAction, queue chan<- KubeWork, serviceObj interface{}) {
+       if service, ok := serviceObj.(*kapi.Service); ok {
+               log.Println("Service Action: ", action, " for service ", service.Name)
+               queue <- KubeWork{
+                       Action:  action,
+                       Service: service,
+               }
+       }
+}
+
+func watchForServices(kubeClient *kclient.Client, queue chan<- KubeWork) {
+       _, svcController := kframework.NewInformer(
+               createServiceLW(kubeClient),
+               &kapi.Service{},
+               resyncPeriod,
+               kframework.ResourceEventHandlerFuncs{
+                       AddFunc: func(obj interface{}) {
+                               sendServiceWork(KubeWorkAddService, queue, obj)
+                       },
+                       DeleteFunc: func(obj interface{}) {
+                               sendServiceWork(KubeWorkRemoveService, queue, obj)
+                       },
+                       UpdateFunc: func(oldObj, newObj interface{}) {
+                               if reflect.DeepEqual(newObj, oldObj) == false {
+                                       sendServiceWork(KubeWorkUpdateService, queue, newObj)
+                               }
+                       },
+               },
+       )
+       stop := make(chan struct{})
+       go svcController.Run(stop)
+}
+
+// Returns a cache.ListWatch that gets all changes to Pods.
+func createPodLW(kubeClient *kclient.Client) *kcache.ListWatch {
+       return kcache.NewListWatchFromClient(kubeClient, "pods", kapi.NamespaceAll, kselector.Everything())
+}
+
+// Dispatch the notifications for Pods by type to the worker
+func sendPodWork(action KubeWorkAction, queue chan<- KubeWork, podObj interface{}) {
+       if pod, ok := podObj.(*kapi.Pod); ok {
+               log.Println("Pod Action: ", action, " for Pod:", pod.Name)
+               queue <- KubeWork{
+                       Action: action,
+                       Pod:    pod,
+               }
+       }
+}
+
+// Launch the go routine to watch notifications for Pods.
+func watchForPods(kubeClient *kclient.Client, queue chan<- KubeWork) {
+       var podController *kframework.Controller
+       _, podController = kframework.NewInformer(
+               createPodLW(kubeClient),
+               &kapi.Pod{},
+               resyncPeriod,
+               kframework.ResourceEventHandlerFuncs{
+                       AddFunc: func(obj interface{}) {
+                               sendPodWork(KubeWorkAddPod, queue, obj)
+                       },
+                       DeleteFunc: func(obj interface{}) {
+                               if o, ok := obj.(*kapi.Pod); ok {
+                                       if _, ok := deleteMap[o.Name]; ok {
+                                               delete(deleteMap, o.Name)
+                                       }
+                               }
+                               sendPodWork(KubeWorkRemovePod, queue, obj)
+                       },
+                       UpdateFunc: func(oldObj, newObj interface{}) {
+                               o, n := oldObj.(*kapi.Pod), newObj.(*kapi.Pod)
+                               if reflect.DeepEqual(oldObj, newObj) == false {
+                                       //Adding Pod
+                                       if _, ok := addMap[n.Name]; ok {
+                                               if kapi.IsPodReady(n) {
+                                                       delete(addMap, n.Name)
+                                                       sendPodWork(KubeWorkUpdatePod, queue, newObj)
+                                               }
+                                               return
+                                       }
+                                       //Deleting Pod
+                                       if _, ok := deleteMap[n.Name]; ok {
+                                               return
+                                       } else {
+                                               if o.ObjectMeta.DeletionTimestamp == nil &&
+                                                       n.ObjectMeta.DeletionTimestamp != nil {
+                                                       deleteMap[n.Name] = n
+                                                       return
+                                               }
+                                               //Updating Pod
+                                               sendPodWork(KubeWorkUpdatePod, queue, newObj)
+                                       }
+                               }
+                       },
+               },
+       )
+       stop := make(chan struct{})
+       go podController.Run(stop)
+}
+
+func runBookKeeper(workQue <-chan KubeWork, msbQueue chan<- MSBWork) {
+
+       client := newClientBookKeeper()
+       client.msbQueue = msbQueue
+
+       for work := range workQue {
+               switch work.Action {
+               case KubeWorkAddService:
+                       client.AddService(work.Service)
+               case KubeWorkRemoveService:
+                       client.RemoveService(work.Service)
+               case KubeWorkUpdateService:
+                       client.UpdateService(work.Service)
+               case KubeWorkAddPod:
+                       client.AddPod(work.Pod)
+               case KubeWorkRemovePod:
+                       client.RemovePod(work.Pod)
+               case KubeWorkUpdatePod:
+                       client.UpdatePod(work.Pod)
+               default:
+                       log.Println("Unsupported work action: ", work.Action)
+               }
+       }
+       log.Println("Completed all work")
+}
+
+func runMSBWorker(queue <-chan MSBWork, client Client) {
+       worker := newMSBAgentWorker(client)
+
+       for work := range queue {
+               log.Println("MSB Work Action: ", work.Action, " ServiceInfo:", work.ServiceInfo)
+
+               switch work.Action {
+               case MSBWorkAddService:
+                       worker.AddService(work.IPAddress, work.ServiceInfo)
+               case MSBWorkRemoveService:
+                       worker.RemoveService(work.IPAddress, work.ServiceInfo)
+               case MSBWorkAddPod:
+                       worker.AddPod(work.IPAddress, work.ServiceInfo)
+               case MSBWorkRemovePod:
+                       worker.RemovePod(work.IPAddress, work.ServiceInfo)
+               default:
+                       log.Println("Unsupported Action of: ", work.Action)
+               }
+       }
+}
+
+func main() {
+       flag.Parse()
+       var err error
+
+       msbClient, err := newMSBClient()
+       if err != nil {
+               log.Fatalf("Failed to create MSB client - %v", err)
+       }
+
+       kubeClient, err := newKubeClient()
+       if err != nil {
+               log.Fatalf("Failed to create a kubernetes client: %v", err)
+       }
+
+       if _, err := kubeClient.ServerVersion(); err != nil {
+               log.Fatal("Could not connect to Kube Master", err)
+       } else {
+               log.Println("Connected to K8S API Server")
+       }
+
+       kubeWorkQueue := make(chan KubeWork)
+       msbWorkQueue := make(chan MSBWork)
+       go runBookKeeper(kubeWorkQueue, msbWorkQueue)
+       watchForServices(kubeClient, kubeWorkQueue)
+       watchForPods(kubeClient, kubeWorkQueue)
+       go runMSBWorker(msbWorkQueue, msbClient)
+
+       // Prevent exit
+       select {}
+}
diff --git a/kube2msb/src/kube_work.go b/kube2msb/src/kube_work.go
new file mode 100644 (file)
index 0000000..4e99cbd
--- /dev/null
@@ -0,0 +1,195 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+       "log"
+       "sync"
+
+       kapi "k8s.io/kubernetes/pkg/api"
+)
+
+type KubeBookKeeper interface {
+       AddService(*kapi.Service)
+       RemoveService(*kapi.Service)
+       UpdateService(*kapi.Service)
+       AddPod(*kapi.Pod)
+       RemovePod(*kapi.Pod)
+       UpdatePod(*kapi.Pod)
+}
+
+type ClientBookKeeper struct {
+       sync.Mutex
+       KubeBookKeeper
+       services map[string]*kapi.Service
+       pods     map[string]*kapi.Pod
+       msbQueue chan<- MSBWork
+}
+
+func newClientBookKeeper() *ClientBookKeeper {
+       return &ClientBookKeeper{
+               services: make(map[string]*kapi.Service),
+               pods:     make(map[string]*kapi.Pod),
+       }
+}
+
+func (client *ClientBookKeeper) AddService(svc *kapi.Service) {
+       client.Lock()
+       defer client.Unlock()
+       if _, ok := svc.ObjectMeta.Annotations[serviceKey]; !ok {
+               log.Println("Not the target, skip this ADD notification for service:", svc.Name)
+               return
+       }
+
+       if _, ok := client.services[svc.Name]; ok {
+               log.Printf("service:%s already exist. skip this ADD notification.", svc.Name)
+               return
+       }
+
+       if kapi.IsServiceIPSet(svc) {
+               if svc.Spec.Type == kapi.ServiceTypeClusterIP || svc.Spec.Type == kapi.ServiceTypeNodePort {
+                       log.Printf("Adding %s service:%s", svc.Spec.Type, svc.Name)
+                       client.msbQueue <- MSBWork{
+                               Action:      MSBWorkAddService,
+                               ServiceInfo: svc.ObjectMeta.Annotations[serviceKey],
+                               IPAddress:   svc.Spec.ClusterIP,
+                       }
+               } else if svc.Spec.Type == kapi.ServiceTypeLoadBalancer {
+                       log.Println("Adding LoadBalancerIP service:", svc.Name)
+                       client.msbQueue <- MSBWork{
+                               Action:      MSBWorkAddService,
+                               ServiceInfo: svc.ObjectMeta.Annotations[serviceKey],
+                               IPAddress:   svc.Spec.LoadBalancerIP,
+                       }
+               } else {
+                       log.Printf("Service Type:%s for Service:%s is not supported", svc.Spec.Type, svc.Name)
+                       return
+               }
+               client.services[svc.Name] = svc
+               log.Println("Queued Service to be added: ", svc.Name)
+       } else {
+               // if ClusterIP is not set, do not create a DNS records
+               log.Printf("Skipping dns record for headless service: %s\n", svc.Name)
+       }
+}
+
+func (client *ClientBookKeeper) RemoveService(svc *kapi.Service) {
+       client.Lock()
+       defer client.Unlock()
+       if _, ok := svc.ObjectMeta.Annotations[serviceKey]; !ok {
+               log.Println("Not the target, skip this Remove notification for service:", svc.Name)
+               return
+       }
+
+       if _, ok := client.services[svc.Name]; !ok {
+               log.Printf("Service:%s not exist. skip this REMOVE notification.", svc.Name)
+               return
+       }
+
+       if svc.Spec.Type == kapi.ServiceTypeClusterIP || svc.Spec.Type == kapi.ServiceTypeNodePort {
+               log.Printf("Removing %s service:%s", svc.Spec.Type, svc.Name)
+               //Perform All DNS Removes
+               client.msbQueue <- MSBWork{
+                       Action:      MSBWorkRemoveService,
+                       ServiceInfo: svc.ObjectMeta.Annotations[serviceKey],
+                       IPAddress:   svc.Spec.ClusterIP,
+               }
+       } else if svc.Spec.Type == kapi.ServiceTypeLoadBalancer {
+               log.Println("Removing LoadBalancerIP service:", svc.Name)
+               client.msbQueue <- MSBWork{
+                       Action:      MSBWorkRemoveService,
+                       ServiceInfo: svc.ObjectMeta.Annotations[serviceKey],
+                       IPAddress:   svc.Spec.LoadBalancerIP,
+               }
+       } else {
+               log.Printf("Service Type:%s for Service:%s is not supported", svc.Spec.Type, svc.Name)
+               return
+       }
+       delete(client.services, svc.Name)
+       log.Println("Queued Service to be removed: ", svc.Name)
+}
+
+func (client *ClientBookKeeper) UpdateService(svc *kapi.Service) {
+       if _, ok := svc.ObjectMeta.Annotations[serviceKey]; !ok {
+               log.Println("Not the target, skip this Update notification for service:", svc.Name)
+               return
+       }
+
+       client.RemoveService(svc)
+       client.AddService(svc)
+}
+
+func (client *ClientBookKeeper) AddPod(pod *kapi.Pod) {
+       client.Lock()
+       defer client.Unlock()
+       if _, ok := pod.Annotations[serviceKey]; !ok {
+               log.Println("Not the target, skip this ADD notification for pod:", pod.Name)
+               return
+       }
+
+       if _, ok := client.pods[pod.Name]; ok {
+               log.Printf("Pod:%s already exist. skip this ADD notification.", pod.Name)
+               return
+       }
+
+       //newly added Pod
+       if pod.Name == "" || pod.Status.PodIP == "" {
+               log.Printf("Pod:%s has neither name nor pod ip. skip this ADD notification.", pod.Name)
+               addMap[pod.Name] = pod
+               return
+       }
+
+       //Perform All DNS Adds
+       client.msbQueue <- MSBWork{
+               Action:      MSBWorkAddPod,
+               ServiceInfo: pod.Annotations[serviceKey],
+               IPAddress:   pod.Status.PodIP,
+       }
+       client.pods[pod.Name] = pod
+       log.Println("Queued Pod to be added: ", pod.Name)
+}
+
+func (client *ClientBookKeeper) RemovePod(pod *kapi.Pod) {
+       client.Lock()
+       defer client.Unlock()
+       if _, ok := pod.Annotations[serviceKey]; !ok {
+               log.Println("Not the target, skip this Remove notification for pod:", pod.Name)
+               return
+       }
+
+       if _, ok := client.pods[pod.Name]; !ok {
+               log.Printf("Pod:%s not exist. skip this REMOVE notification.", pod.Name)
+               return
+       }
+       //Perform All DNS Removes
+       client.msbQueue <- MSBWork{
+               Action:      MSBWorkRemovePod,
+               ServiceInfo: pod.Annotations[serviceKey],
+               IPAddress:   pod.Status.PodIP,
+       }
+       delete(client.pods, pod.Name)
+       log.Println("Queued Pod to be removed: ", pod.Name)
+}
+
+func (client *ClientBookKeeper) UpdatePod(pod *kapi.Pod) {
+       if _, ok := pod.Annotations[serviceKey]; !ok {
+               log.Println("Not the target, skip this Update notification for pod:", pod.Name)
+               return
+       }
+
+       client.RemovePod(pod)
+       client.AddPod(pod)
+}
diff --git a/kube2msb/src/msb_client.go b/kube2msb/src/msb_client.go
new file mode 100644 (file)
index 0000000..da0557a
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+       "bytes"
+       "encoding/json"
+       "fmt"
+       "log"
+       "net/http"
+)
+
+const (
+       urlPrefix = "/api/microservices/v1/services"
+)
+
+type Client interface {
+       Register(string)
+       DeRegister(string)
+}
+
+type MSBAgent struct {
+       Client
+       url string
+}
+
+func newMSBAgent(s string) (*MSBAgent, error) {
+       healthCheckURL := s + urlPrefix + "/health"
+       resp, err := http.Get(healthCheckURL)
+       if err != nil {
+               return nil, err
+       }
+
+       if resp.StatusCode != http.StatusOK {
+               return nil, fmt.Errorf("MSB agent:%s is not available", s)
+       }
+
+       return &MSBAgent{url: s}, nil
+}
+
+func (client *MSBAgent) Register(serviceInfo string) {
+       var (
+               sa = &ServiceAnnotation{}
+       )
+       err := json.Unmarshal([]byte(serviceInfo), sa)
+       if err != nil {
+               log.Printf("Failed to Unmarshal serviceInfo to ServiceAnnotation:%v", err)
+               return
+       }
+
+       su := ServiceAnnotation2ServiceUnit(sa)
+       body, _ := json.Marshal(su)
+       postURL := client.url + urlPrefix
+
+       resp, err := http.Post(postURL, "application/json", bytes.NewReader(body))
+       if err != nil {
+               log.Printf("Failed to do a request:%v", err)
+               return
+       }
+
+       log.Printf("Http request to register service:%s returned code:%d", su.Name, resp.StatusCode)
+}
+
+func (client *MSBAgent) DeRegister(serviceInfo string) {
+       var (
+               sa = &ServiceAnnotation{}
+       )
+
+       err := json.Unmarshal([]byte(serviceInfo), sa)
+       if err != nil {
+               log.Printf("Failed to Unmarshal serviceInfo to ServiceAnnotation:%v", err)
+               return
+       }
+
+       deleteURL := client.url + urlPrefix + "/" + sa.ServiceName + "/version/" + sa.Version + "/nodes/" + sa.IP + "/" + sa.Port
+
+       req, err := http.NewRequest("DELETE", deleteURL, nil)
+       if err != nil {
+               log.Printf("(deleteURL:%s) failed to NewRequest:%v", deleteURL, err)
+               return
+       }
+
+       c := &http.Client{}
+       resp, err := c.Do(req)
+       if err != nil {
+               log.Printf("(deleteURL:%s) failed to do a request:%v", deleteURL, err)
+               return
+       }
+       log.Printf("Http request to deregister service:%s returned code:%d", sa.ServiceName, resp.StatusCode)
+}
+
+func ServiceAnnotation2ServiceUnit(sa *ServiceAnnotation) *ServiceUnit {
+       if sa == nil {
+               return nil
+       }
+
+       return &ServiceUnit{
+               Name:        sa.ServiceName,
+               Version:     sa.Version,
+               URL:         sa.URL,
+               Protocol:    sa.Protocol,
+               LBPolicy:    sa.LBPolicy,
+               VisualRange: sa.VisualRange,
+               Instances:   []InstanceUnit{{ServiceIP: sa.IP, ServicePort: sa.Port}},
+       }
+}
diff --git a/kube2msb/src/msb_work.go b/kube2msb/src/msb_work.go
new file mode 100644 (file)
index 0000000..5c40bae
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+package main
+
+import (
+       "log"
+       "strings"
+       "sync"
+)
+
+type MSBWorker interface {
+       AddService(string, string)
+       RemoveService(string)
+       AddPod(string, string)
+       RemovePod(string)
+}
+
+type MSBAgentWorker struct {
+       sync.Mutex
+       MSBWorker
+       agent Client
+}
+
+func newMSBAgentWorker(client Client) *MSBAgentWorker {
+       return &MSBAgentWorker{
+               agent: client,
+       }
+}
+
+func (client *MSBAgentWorker) AddService(ip, sInfo string) {
+       client.Lock()
+       defer client.Unlock()
+
+       if ip == "" || sInfo == "" {
+               log.Println("Service Info is not valid for AddService")
+               return
+       }
+
+       client.agent.Register(mergeIP(ip, sInfo))
+}
+
+func (client *MSBAgentWorker) RemoveService(ip, sInfo string) {
+       client.Lock()
+       defer client.Unlock()
+
+       if sInfo == "" {
+               log.Println("Service Info is not valid for RemoveService")
+               return
+       }
+
+       client.agent.DeRegister(mergeIP(ip, sInfo))
+}
+
+func (client *MSBAgentWorker) AddPod(ip, sInfo string) {
+       client.Lock()
+       defer client.Unlock()
+       if ip == "" || sInfo == "" {
+               log.Println("Service Info is not valid for AddPod")
+               return
+       }
+
+       client.agent.Register(mergeIP(ip, sInfo))
+}
+
+func (client *MSBAgentWorker) RemovePod(ip, sInfo string) {
+       client.Lock()
+       defer client.Unlock()
+       if sInfo == "" {
+               log.Println("Service Info is not valid for RemovePod")
+               return
+       }
+
+       client.agent.DeRegister(mergeIP(ip, sInfo))
+}
+
+func mergeIP(ip, sInfo string) string {
+       insert := "{\"ip\":\"" + ip + "\","
+       parts := strings.Split(sInfo, "{")
+       out := parts[0]
+       for i := 1; i < len(parts); i++ {
+               out += insert + parts[i]
+       }
+       return out
+}
diff --git a/kube2msb/src/types.go b/kube2msb/src/types.go
new file mode 100644 (file)
index 0000000..caaf34a
--- /dev/null
@@ -0,0 +1,135 @@
+/*
+Copyright 2017 ZTE, Inc. and others.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+// types.go
+package main
+
+import (
+       kapi "k8s.io/kubernetes/pkg/api"
+)
+
+type KubeWorkAction string
+
+const (
+       KubeWorkAddService    KubeWorkAction = "AddService"
+       KubeWorkRemoveService KubeWorkAction = "RemoveService"
+       KubeWorkUpdateService KubeWorkAction = "UpdateService"
+       KubeWorkAddPod        KubeWorkAction = "AddPod"
+       KubeWorkRemovePod     KubeWorkAction = "RemovePod"
+       KubeWorkUpdatePod     KubeWorkAction = "UpdatePod"
+)
+
+type KubeWork struct {
+       Action  KubeWorkAction
+       Service *kapi.Service
+       Pod     *kapi.Pod
+}
+
+type MSBWorkAction string
+
+const (
+       MSBWorkAddService    MSBWorkAction = "AddService"
+       MSBWorkRemoveService MSBWorkAction = "RemoveService"
+       MSBWorkAddPod        MSBWorkAction = "AddPod"
+       MSBWorkRemovePod     MSBWorkAction = "RemovePod"
+)
+
+type MSBWork struct {
+       Action      MSBWorkAction
+       ServiceInfo string
+       IPAddress   string
+}
+
+const serviceKey = "msb.onap.org/service-info"
+
+type ServiceUnit struct {
+       Name             string         `json:"serviceName,omitempty"`
+       Version          string         `json:"version"`
+       URL              string         `json:"url"`
+       Protocol         string         `json:"protocol"`
+       VisualRange      string         `json:"visualRange"`
+       LBPolicy         string         `json:"lb_policy"`
+       PublishPort      string         `json:"publish_port"`
+       Namespace        string         `json:"namespace"`
+       NWPlaneType      string         `json:"network_plane_type"`
+       Host             string         `json:"host"`
+       SubDomain        string         `json:"subdomain,omitempty"`
+       Path             string         `json:"path"`
+       Instances        []InstanceUnit `json:"nodes"`
+       Metadata         []MetaUnit     `json:"metadata"`
+       Labels           []string       `json:"labels"`
+       SwaggerURL       string         `json:"swagger_url,omitempty"`
+       IsManual         bool           `json:"is_manual"`
+       EnableSSL        bool           `json:"enable_ssl"`
+       EnableTLS        bool           `json:"enable_tls"`
+       EnableReferMatch string         `json:"enable_refer_match"`
+       ProxyRule        Rules          `json:"proxy_rule,omitempty"`
+       RateLimiting     RateLimit      `json:"rate_limiting,omitempty"`
+}
+
+type InstanceUnit struct {
+       ServiceIP      string `json:"ip,omitempty"`
+       ServicePort    string `json:"port,omitempty"`
+       LBServerParams string `json:"lb_server_params,omitempty"`
+       CheckType      string `json:"checkType,omitempty"`
+       CheckURL       string `json:"checkUrl,omitempty"`
+       CheckInterval  string `json:"checkInterval,omitempty"`
+       CheckTTL       string `json:"ttl,omitempty"`
+       CheckTimeOut   string `json:"checkTimeOut,omitempty"`
+       HaRole         string `json:"ha_role,omitempty"`
+       ServiceID      string `json:"nodeId,omitempty"`
+       ServiceStatus  string `json:"status,omitempty"`
+       APPVersion     string `json:"appversion,omitempty"`
+}
+
+type MetaUnit struct {
+       Key   string `json:"key"`
+       Value string `json:"value"`
+}
+
+type Rules struct {
+       HTTPProxy   HTTPProxyRule   `json:"http_proxy,omitempty"`
+       StreamProxy StreamProxyRule `json:"stream_proxy,omitempty"`
+}
+
+type HTTPProxyRule struct {
+       SendTimeout string `json:"send_timeout,omitempty"`
+       ReadTimeout string `json:"read_timeout,omitempty"`
+}
+
+type StreamProxyRule struct {
+       ProxyTimeout  string `json:"proxy_timeout,omitempty"`
+       ProxyResponse string `json:"proxy_responses,omitempty"`
+}
+
+type RateLimit struct {
+       LimitReq LimitRequest `json:"limit_req,omitempty"`
+}
+
+type LimitRequest struct {
+       Rate  string `json:"rate,omitempty"`
+       Burst string `json:"burst,omitempty"`
+}
+
+type ServiceAnnotation struct {
+       IP          string `json:"ip,omitempty"`
+       Port        string `json:"port,omitempty"`
+       ServiceName string `json:"serviceName,omitempty"`
+       Version     string `json:"version,omitempty"`
+       URL         string `json:"url,omitempty"`
+       Protocol    string `json:"protocol,omitempty"`
+       LBPolicy    string `json:"lb_policy,omitempty"`
+       VisualRange string `json:"visualRange,omitempty"`
+}