Add logic to handle single backlog for ocata 11/55811/1
authorYun Huang <yun.huang@windriver.com>
Wed, 4 Jul 2018 09:56:16 +0000 (17:56 +0800)
committerYun Huang <yun.huang@windriver.com>
Wed, 4 Jul 2018 09:56:16 +0000 (17:56 +0800)
Dispatch the specific backlog to corresponding handler
caching the runtime state into cache

Change-Id: I3cd54f5e0b15210cf90c8d14b0ec423af858c0ac
Issue-ID: MULTICLOUD-230
Signed-off-by: Yun Huang <yun.huang@windriver.com>
ocata/ocata/vesagent/event_domain/fault_vm.py
ocata/ocata/vesagent/tasks.py

index 314847c..5ece71a 100644 (file)
@@ -108,3 +108,12 @@ def buildBacklog_fault_vm(vimid, backlog_input):
     logger.debug("with backlog: %s" % backlog)
     return backlog
 
+
+### process backlog with domain:"fault", type:"vm"
+
+def processBacklog_fault_vm(vesAgentConfig, vesAgentState, oneBacklog):
+    logger.debug("vesAgentConfig:%s, vesAgentState:%s, oneBacklog: %s"
+                 % (vesAgentConfig, vesAgentState, oneBacklog))
+
+    return
+
index 2c94c8e..21f1e20 100644 (file)
@@ -22,6 +22,8 @@ import time
 
 from django.core.cache import cache
 
+from ocata.vesagent.event_domain.fault_vm import processBacklog_fault_vm
+
 logger = logging.getLogger(__name__)
 
 
@@ -83,7 +85,7 @@ def processBacklogsOfOneVIM(vimid):
     :param vimid:
     :return:
     '''
-    backlog_count = 3
+    backlog_count = 0
     next_time_slot = 10
 
     try:
@@ -100,9 +102,95 @@ def processBacklogsOfOneVIM(vimid):
             return 0,next_time_slot
 
 
+        vesAgentStateStr = cache.get("VesAgentBacklogs.state.%s" % (vimid))
+        vesAgentState = json.loads(vesAgentStateStr) if vesAgentStateStr is not None else {}
+
+        ves_info = vesAgentConfig.get("subscription", None)
+        if ves_info is None:
+            logger.warn("VesAgentBacklogs.config.%s: ves subscription corrupts:%s" % (vimid, vesAgentConfigStr))
+            return 0,next_time_slot
+
+        poll_interval_default = vesAgentConfig.get("poll_interval_default", None)
+        if poll_interval_default is None:
+            logger.warn("VesAgentBacklogs.config.%s: poll_interval_default corrupts:%s" % (vimid, vesAgentConfigStr))
+            return 0,next_time_slot
+
+        if poll_interval_default == 0:
+            # invalid interval value
+            logger.warn("VesAgentBacklogs.config.%s: poll_interval_default invalid:%s" % (vimid, vesAgentConfigStr))
+            return 0,next_time_slot
+
+        backlogs_list = vesAgentConfig.get("backlogs", None)
+        if backlogs_list is None:
+            logger.warn("VesAgentBacklogs.config.%s: backlogs corrupts:%s" % (vimid, vesAgentConfigStr))
+            return 0,next_time_slot
+
+        for backlog in backlogs_list:
+            backlog_count_tmp, next_time_slot_tmp = processOneBacklog(
+                               vesAgentConfig, vesAgentState, poll_interval_default, backlog)
+            logger.debug("processOneBacklog return with %s,%s" % (backlog_count_tmp, next_time_slot_tmp))
+            backlog_count += backlog_count_tmp
+            next_time_slot = next_time_slot_tmp if next_time_slot > next_time_slot_tmp else next_time_slot
+
+            pass
+
+        # save back the updated backlogs state
+        vesAgentStateStr = json.dumps(vesAgentState)
+        cache.set("VesAgentBacklogs.state.%s" % vimid, vesAgentStateStr, None)
+
+    except Exception as e:
+        logger.error("exception:%s" % str(e))
+
+    return backlog_count, next_time_slot
+
+
+def processOneBacklog(vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog):
+    logger.info("Process one backlog")
+    #logger.debug("vesAgentConfig:%s, vesAgentState:%s, poll_interval_default:%s, oneBacklog: %s"
+    #             % (vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog))
+
+    backlog_count = 1
+    next_time_slot = 10
+    try:
+        timestamp_now = int(time.time())
+        backlog_uuid = oneBacklog.get("backlog_uuid", None)
+        if backlog_uuid is None:
+            # warning: uuid is None, omit this backlog
+            logger.warn("backlog without uuid: %s" % oneBacklog)
+            return 0, next_time_slot
+
+        backlogState = vesAgentState.get("%s" % (backlog_uuid), None)
+        if backlogState is None:
+            initialBacklogState = {
+                "timestamp": timestamp_now
+            }
+            vesAgentState["%s" % (backlog_uuid)] = initialBacklogState
+            backlogState = initialBacklogState
+
+        time_expiration = backlogState["timestamp"] \
+                          + oneBacklog.get("poll_interval", poll_interval_default)
+        # check if poll interval expires
+        if timestamp_now < time_expiration:
+            # not expired yet
+            logger.info("return without dispatching, not expired yet")
+            return backlog_count, next_time_slot
+
+        logger.info("Dispatching backlog")
+
+        # collect data in case of expiration
+        if oneBacklog["domain"] == "fault" and oneBacklog["type"] == "vm":
+            processBacklog_fault_vm(vesAgentConfig, vesAgentState, oneBacklog)
+        else:
+            logger.warn("Dispatching backlog fails due to unsupported backlog domain %s,type:%s"
+                        % (oneBacklog["domain"], oneBacklog["type"]))
+            backlog_count = 0
+            pass
 
+        # update timestamp and internal state
+        backlogState["timestamp"] = timestamp_now
     except Exception as e:
         logger.error("exception:%s" % str(e))
 
+    logger.info("return")
     return backlog_count, next_time_slot