+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import logging
+import traceback
+from threading import Thread
+
+from lcm.pub.utils.syscomm import fun_name
+from lcm.pub.utils.values import ignore_case_get
+from lcm.pub.utils import restcall
+from lcm.pub.exceptions import NSLCMException
+from lcm.workflows.graphflow.flow.flow import GraphFlow
+
+logger = logging.getLogger(__name__)
+
+RESULT_OK, RESULT_NG = "0", "1"
+JOB_ERROR = 255
+
+config = {
+ "CreateVnf": {"module": "lcm.ns_vnfs", "class": "CreateVnf"},
+ "CreatePnf": {"module": "lcm.ns_pnfs", "class": "CreatePnf"},
+ "CreateVl": {"module": "lcm.ns_vls", "class": "CreateVl"}
+}
+
+
+class NsInstantiateWorkflowThread(Thread):
+ def __init__(self, plan_input):
+ Thread.__init__(self)
+ self.plan_input = plan_input
+
+ def run(self):
+ run_ns_instantiate(self.plan_input)
+
+
+def run_ns_instantiate(input_data):
+ """
+ format of input_data
+ {
+ "jobId": uuid of job,
+ "nsInstanceId": id of ns instance,
+ "object_context": json format of nsd,
+ "object_additionalParamForNs": json format of additional parameters for ns,
+ "object_additionalParamForVnf": json format of additional parameters for vnf,
+ "object_additionalParamForPnf": json format of additional parameters for pnf,
+ "vlCount": int type of VL count,
+ "vnfCount: int type of VNF count
+ }
+ """
+ logger.debug("Enter %s, input_data is %s", fun_name(), input_data)
+ ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
+ job_id = ignore_case_get(input_data, "jobId")
+ update_job(job_id, 10, "true", "Start to prepare the NS instantiate workflow parameter")
+ deploy_graph = build_deploy_graph(input_data)
+ TaskSet = build_TaskSet(input_data)
+ ns_instantiate_ok = False
+
+ try:
+ update_job(job_id, 15, "true", "Start the NS instantiate workflow")
+ gf = GraphFlow(deploy_graph, TaskSet, config)
+ logger.debug("NS graph flow run up!")
+ gf.start()
+ gf.join()
+ gf.task_manager.wait_tasks_done(gf.sort_nodes)
+ if gf.task_manager.is_all_task_finished():
+ logger.debug("NS is instantiated!")
+ update_job(job_id, 90, "true", "Start to post deal")
+ post_deal(ns_inst_id, "true")
+ update_job(job_id, 100, "true", "Create NS successfully.")
+ ns_instantiate_ok = True
+ except NSLCMException as e:
+ logger.error("Failded to Create NS: %s", e.message)
+ update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
+ post_deal(ns_inst_id, "false")
+ except:
+ logger.error(traceback.format_exc())
+ update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
+ post_deal(ns_inst_id, "false")
+ return ns_instantiate_ok
+
+
+def build_deploy_graph(input_data):
+ nsd_json_str = ignore_case_get(input_data, "object_context")
+ nsd_json = json.JSONDecoder().decode(nsd_json_str)
+ deploy_graph = ignore_case_get(nsd_json, "graph")
+ logger.debug("NS graph flow: %s" % deploy_graph)
+ return deploy_graph
+
+
+def build_vls(input_data):
+ ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
+ nsd_json = json.JSONDecoder().decode(ignore_case_get(input_data, "object_context"))
+ ns_param_json = ignore_case_get(input_data, "object_additionalParamForNs")
+ vl_count = int(ignore_case_get(input_data, "vlCount", 0))
+
+ vls = {}
+ for i in range(vl_count):
+ data = {
+ "nsInstanceId": ns_inst_id,
+ "vlIndex": i,
+ "context": nsd_json,
+ "additionalParamForNs": ns_param_json
+ }
+ key = nsd_json["vls"][i - 1]["vl_id"]
+ vls[key] = {
+ "type": "CreateVl",
+ "input": {
+ "content": data
+ }
+ }
+ return vls
+
+
+def build_vnfs(input_data):
+ ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
+ vnf_count = int(ignore_case_get(input_data, "vnfCount", 0))
+ vnf_param_json = json.JSONDecoder().decode(ignore_case_get(input_data, "object_additionalParamForVnf"))
+ vnfs = {}
+ for i in range(vnf_count):
+ data = {
+ "nsInstanceId": ns_inst_id,
+ "vnfIndex": i,
+ "additionalParamForVnf": vnf_param_json
+ }
+ key = vnf_param_json[i - 1]["vnfProfileId"]
+ vnfs[key] = {
+ "type": "CreateVnf",
+ "input": {
+ "content": data
+ }
+ }
+ return vnfs
+
+
+def build_pnfs(input_data):
+ return json.JSONDecoder().decode(ignore_case_get(input_data, "object_additionalParamForPnf"))
+
+
+def build_TaskSet(input_data):
+ vls = build_vls(input_data)
+ vnfs = build_vnfs(input_data)
+ pnfs = build_pnfs(input_data)
+ task_set = dict(dict(vls, **vnfs), **pnfs)
+ return task_set
+
+
+def post_deal(ns_inst_id, status):
+ uri = "api/nslcm/v1/ns/{nsInstanceId}/postdeal".format(nsInstanceId=ns_inst_id)
+ data = json.JSONEncoder().encode({
+ "status": status
+ })
+
+ ret = restcall.req_by_msb(uri, "POST", data)
+ if ret[0] != 0:
+ logger.error("Failed to call post_deal(%s): %s", ns_inst_id, ret[1])
+ logger.debug("Call post_deal(%s, %s) successfully.", ns_inst_id, status)
+
+
+def update_job(job_id, progress, errcode, desc):
+ logger.debug("job_id %s" % job_id)
+ uri = "api/nslcm/v1/jobs/{jobId}".format(jobId=job_id)
+ data = json.JSONEncoder().encode({
+ "progress": progress,
+ "errcode": errcode,
+ "desc": desc
+ })
+ ret = restcall.req_by_msb(uri, "POST", data)
+ return ret