monitor consul data change 35/58635/1
authorLvbo163 <lv.bo163@zte.com.cn>
Thu, 2 Aug 2018 09:04:51 +0000 (17:04 +0800)
committerLvbo163 <lv.bo163@zte.com.cn>
Thu, 2 Aug 2018 09:04:51 +0000 (17:04 +0800)
Issue-ID: MSB-248

Change-Id: I495adcc4556c017ad42655ffcead957af9a9c106
Signed-off-by: Lvbo163 <lv.bo163@zte.com.cn>
msb2pilot/src/msb2pilot/consul/monitor.go [new file with mode: 0644]
msb2pilot/src/msb2pilot/main.go

diff --git a/msb2pilot/src/msb2pilot/consul/monitor.go b/msb2pilot/src/msb2pilot/consul/monitor.go
new file mode 100644 (file)
index 0000000..c3adde4
--- /dev/null
@@ -0,0 +1,78 @@
+/**
+ * Copyright (c) 2018 ZTE Corporation.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * and the Apache License 2.0 which both accompany this distribution,
+ * and are available at http://www.eclipse.org/legal/epl-v10.html
+ * and http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Contributors:
+ *     ZTE - initial Project
+ */
+package consul
+
+import (
+       "msb2pilot/log"
+       "msb2pilot/models"
+       "time"
+
+       "github.com/hashicorp/consul/api"
+)
+
+type Monitor interface {
+       Start(<-chan struct{})
+}
+
+type ServiceHandler func(newServices []*models.MsbService)
+
+type consulMonitor struct {
+       discovery      *api.Client
+       period         time.Duration
+       serviceHandler ServiceHandler
+}
+
+func NewConsulMonitor(client *api.Client, period time.Duration, serviceHandler ServiceHandler) Monitor {
+       return &consulMonitor{
+               discovery:      client,
+               period:         period,
+               serviceHandler: serviceHandler,
+       }
+}
+
+func (this *consulMonitor) Start(stop <-chan struct{}) {
+       this.run(stop)
+}
+
+func (this *consulMonitor) run(stop <-chan struct{}) {
+       ticker := time.NewTicker(this.period)
+       for {
+               select {
+               case <-stop:
+                       ticker.Stop()
+                       return
+               case <-ticker.C:
+                       this.updateServiceRecord()
+               }
+       }
+
+}
+
+func (this *consulMonitor) updateServiceRecord() {
+       data, err := GetServices()
+       if err != nil {
+               log.Log.Error("failed to get services from consul", err)
+               return
+       }
+
+       newRecords := make([]*models.MsbService, 0, len(data))
+       for name := range data {
+               endpoints, err := GetInstances(name)
+               if err != nil {
+                       log.Log.Error("failed to get service instance of "+name, err)
+                       continue
+               }
+               newRecords = append(newRecords, models.ConvertService(endpoints))
+       }
+
+       this.serviceHandler(newRecords)
+}
index 236e369..7ed762c 100644 (file)
 package main
 
 import (
-       _ "msb2pilot/consul"
+       "fmt"
+       "msb2pilot/consul"
        "msb2pilot/log"
+       "msb2pilot/models"
        _ "msb2pilot/routers"
+       "time"
 
        "github.com/astaxie/beego"
 )
 
 func main() {
        log.Log.Informational("**************** init msb2pilot ************************")
+       // start sync msb data
+       go syncConsulData()
 
        beego.Run()
 }
+
+func syncConsulData() {
+       stop := make(chan struct{})
+       monitor := consul.NewConsulMonitor(nil, 20*time.Second, syncMsbData)
+       monitor.Start(stop)
+}
+
+func syncMsbData(newServices []*models.MsbService) {
+       fmt.Println(len(newServices), "services updated", time.Now())
+}