Add create vnf of build-in workflow 03/8403/1
authorfujinhua <fu.jinhua@zte.com.cn>
Wed, 23 Aug 2017 09:34:41 +0000 (17:34 +0800)
committerfujinhua <fu.jinhua@zte.com.cn>
Wed, 23 Aug 2017 09:34:41 +0000 (17:34 +0800)
Change-Id: Idd15d70ec322ee00f27d6e233ba81f18570ad9bf
Issue-Id: VFC-132
Signed-off-by: fujinhua <fu.jinhua@zte.com.cn>
lcm/workflows/build_in.py

index b24c7af..1e4b292 100644 (file)
@@ -13,6 +13,8 @@
 # limitations under the License.
 import logging
 import traceback
+from threading import Thread
+import time
 
 from lcm.pub.utils.syscomm import fun_name
 from lcm.pub.utils.values import ignore_case_get
@@ -22,6 +24,9 @@ from lcm.pub.exceptions import NSLCMException
 logger = logging.getLogger(__name__)
 
 RESULT_OK, RESULT_NG = "0", "1"
+JOB_ERROR = 255
+
+g_jobs_status = {}
 
 """
 format of input_data
@@ -48,25 +53,34 @@ def run_ns_instantiate(input_data):
     vnf_count = ignore_case_get(input_data, "vnfCount")
     sfc_count = ignore_case_get(input_data, "sfcCount")
     sdnc_id = ignore_case_get(input_data, "sdnControllerId")
-    
-    update_job(job_id, 10, "0", "Start to create VL")
-    for i in range(vl_count):
-        create_vl(ns_inst_id, i + 1, nsd_json, ns_param_json)
+    g_jobs_status[job_id] = [1 for i in range(vnf_count)]
+    try:
+        update_job(job_id, 10, "0", "Start to create VL")
+        for i in range(vl_count):
+            create_vl(ns_inst_id, i + 1, nsd_json, ns_param_json)
+
+        update_job(job_id, 30, "0", "Start to create VNF")
+        jobs = [create_vnf(ns_inst_id, i + 1, vnf_param_json) for i in range(vnf_count)] 
+        wait_until_jobs_done(job_id, jobs)
 
-    update_job(job_id, 30, "0", "Start to create VNF")
-    for i in range(vnf_count):
-        create_vnf()
-    wait_until_job_done()
+        update_job(job_id, 70, "0", "Start to create SFC")
+        for i in range(sfc_count):
+            create_sfc()
 
-    update_job(job_id, 70, "0", "Start to create SFC")
-    for i in range(sfc_count):
-        create_sfc()
-    wait_until_job_done()
+        update_job(job_id, 90, "0", "Start to post deal")
+        post_deal()
+
+        update_job(job_id, 100, "0", "Create NS successfully.")
+    except NSLCMException as e:
+        logger.error("Failded to Create NS: %s", e.message)
+        update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
+    except:
+        logger.error(traceback.format_exc())
+        update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
+    finally:
+        g_jobs_status.pop(job_id)
 
-    update_job(job_id, 90, "0", "Start to post deal")
-    post_deal()
 
-    update_job(job_id, 100, "0", "Create NS successfully.")
 
 def create_vl(ns_inst_id, vl_index, nsd, ns_param):
     uri = "api/nslcm/v1/ns/vls"
@@ -91,9 +105,23 @@ def create_vl(ns_inst_id, vl_index, nsd, ns_param):
 
     logger.debug("Create VL(%s) successfully.", vl_id)
 
-def create_vnf():
-    # TODO:
-    pass
+def create_vnf(ns_inst_id, vnf_index, nf_param):
+    uri = "/ns/vnfs"
+    data = json.JSONEncoder().encode({
+        "nsInstanceId": ns_inst_id,
+        "vnfIndex": vnf_index,
+        "additionalParamForVnf": nf_param
+    })
+
+    ret = restcall.req_by_msb(uri, "POST", data)
+    if ret[0] != 0:
+        logger.error("Failed to call create_vnf(%s): %s", vnf_index, ret[1])
+        raise NSLCMException("Failed to call create_vnf(index is %s)" % vnf_index)
+
+    vnf_inst_id = ret[1]["vnfInstId"]
+    job_id = ret[1]["jobId"]
+    logger.debug("Create VNF(%s) started.", vnf_inst_id)
+    return vnf_inst_id, job_id, vnf_index - 1
 
 def create_sfc():
     # TODO:
@@ -112,6 +140,66 @@ def update_job(job_id, progress, errcode, desc):
     })
     restcall.req_by_msb(uri, "POST", data)  
 
-def wait_until_job_done():
-    # TODO:
-    pass  
+class JobWaitThread(Thread):
+    """
+    Job Wait 
+    """
+
+    def __init__(self, inst_id, job_id, index):
+        Thread.__init__(self)
+        self.inst_id = inst_id
+        self.job_id = job_id
+        self.index = index
+        self.retry_count = 60
+        self.interval_second = 3
+
+    def run(self):
+        count = 0
+        response_id, new_response_id = 0, 0
+        job_end_normal, job_timeout = False, True
+        while count < self.retry_count:
+            count = count + 1
+            time.sleep(self.interval_second)
+            uri = "/api/nslcm/v1/jobs/%s?responseId=%s" % (self.job_id, response_id)
+            ret = restcall.req_by_msb(uri, "GET")
+            if ret[0] != 0:
+                logger.error("Failed to query job: %s:%s", ret[2], ret[1])
+                continue
+            job_result = json.JSONDecoder().decode(ret[1])
+            if "responseDescriptor" not in job_result:
+                logger.error("Job(%s) does not exist.", self.job_id)
+                continue
+            progress = job_result["responseDescriptor"]["progress"]
+            new_response_id = job_result["responseDescriptor"]["responseId"]
+            job_desc = job_result["responseDescriptor"]["statusDescription"]
+            if new_response_id != response_id:
+                logger.debug("%s:%s:%s", progress, new_response_id, job_desc)
+                response_id = new_response_id
+                count = 0
+            if progress == JOB_ERROR:
+                job_timeout = False
+                logger.error("Job(%s) failed: %s", self.job_id, job_desc)
+                break
+            elif progress == 100:
+                job_end_normal, job_timeout = True, False
+                logger.info("Job(%s) ended normally", self.job_id)
+                break
+        if job_timeout:
+            logger.error("Job(%s) timeout", self.job_id)
+        if self.job_id in g_jobs_status:
+            if job_end_normal:
+                g_jobs_status[self.job_id][self.index] = 0
+
+def wait_until_jobs_done(g_job_id, jobs):
+    job_threads = []
+    for inst_id, job_id, index in jobs:
+        job_threads.append(JobWaitThread(inst_id, job_id, index))
+    for t in job_threads:
+        t.start()
+    for t in job_threads:
+        t.join()
+    if g_job_id in g_jobs_status:
+        if sum(g_jobs_status[g_job_id]) > 0:
+            raise NSLCMException("Some jobs failed!")
+      
+