1 # Copyright 2018 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from threading import Thread
20 from lcm.pub.utils.syscomm import fun_name
21 from lcm.pub.utils.values import ignore_case_get
22 from lcm.pub.utils import restcall
23 from lcm.jobs.enum import JOB_PROGRESS, JOB_ERROR_CODE
24 from lcm.pub.exceptions import NSLCMException
25 from lcm.workflows.graphflow.flow.flow import GraphFlow
26 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
29 logger = logging.getLogger(__name__)
32 "CreateVnf": {"module": "lcm.ns_vnfs", "class": "CreateVnf"},
33 "CreatePnf": {"module": "lcm.ns_pnfs", "class": "CreatePnf"},
34 "CreateVl": {"module": "lcm.ns_vls", "class": "CreateVl"}
38 class NsInstantiateWorkflowThread(Thread):
39 def __init__(self, plan_input, occ_id):
41 self.plan_input = plan_input
45 run_ns_instantiate(self.plan_input, self.occ_id)
48 def run_ns_instantiate(input_data, occ_id):
53 "nsInstanceId": id of ns instance,
54 "object_context": json format of nsd,
55 "object_additionalParamForNs": json format of additional parameters for ns,
56 "object_additionalParamForVnf": json format of additional parameters for vnf,
57 "object_additionalParamForPnf": json format of additional parameters for pnf,
58 "vlCount": int type of VL count,
59 "vnfCount: int type of VNF count
62 logger.debug("Enter %s, input_data is %s", fun_name(), input_data)
63 ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
64 job_id = ignore_case_get(input_data, "jobId")
65 update_job(job_id, 10, JOB_ERROR_CODE.NO_ERROR, "Start to prepare the NS instantiate workflow parameter")
66 deploy_graph = build_deploy_graph(input_data)
67 TaskSet = build_TaskSet(input_data)
68 ns_instantiate_ok = False
71 update_job(job_id, 15, "true", "Start the NS instantiate workflow")
72 gf = GraphFlow(deploy_graph, TaskSet, config)
73 logger.debug("NS graph flow run up!")
76 gf.task_manager.wait_tasks_done(gf.sort_nodes)
77 if gf.task_manager.is_all_task_finished():
78 logger.debug("NS is instantiated!")
79 update_job(job_id, 90, JOB_ERROR_CODE.NO_ERROR, "Start to post deal")
80 post_deal(ns_inst_id, "true")
81 update_job(job_id, JOB_PROGRESS.FINISHED, JOB_ERROR_CODE.NO_ERROR, "Create NS successfully.")
82 NsLcmOpOcc.update(occ_id, "COMPLETED")
83 ns_instantiate_ok = True
84 except NSLCMException as e:
85 logger.error("Failded to Create NS: %s", e.message)
86 update_job(job_id, JOB_PROGRESS.ERROR, JOB_ERROR_CODE.ERROR, "Failded to Create NS.")
87 NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.message)
88 post_deal(ns_inst_id, "false")
89 except Exception as e:
90 logger.error(traceback.format_exc())
91 update_job(job_id, JOB_PROGRESS.ERROR, JOB_ERROR_CODE.ERROR, "Failded to Create NS.")
92 NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.message)
93 post_deal(ns_inst_id, "false")
94 return ns_instantiate_ok
97 def build_deploy_graph(input_data):
98 nsd_json_str = ignore_case_get(input_data, "object_context")
99 nsd_json = json.JSONDecoder().decode(nsd_json_str)
100 deploy_graph = ignore_case_get(nsd_json, "graph")
101 logger.debug("NS graph flow: %s" % deploy_graph)
105 def build_vls(input_data):
106 ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
107 nsd_json = json.JSONDecoder().decode(ignore_case_get(input_data, "object_context"))
108 ns_param_json = ignore_case_get(input_data, "object_additionalParamForNs")
109 vl_count = int(ignore_case_get(input_data, "vlCount", 0))
112 for i in range(vl_count):
114 "nsInstanceId": ns_inst_id,
117 "additionalParamForNs": ns_param_json
119 key = nsd_json["vls"][i - 1]["vl_id"]
129 def build_vnfs(input_data):
130 ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
131 vnf_count = int(ignore_case_get(input_data, "vnfCount", 0))
132 vnf_param_json = json.JSONDecoder().decode(ignore_case_get(input_data, "object_additionalParamForVnf"))
134 for i in range(vnf_count):
136 "nsInstanceId": ns_inst_id,
138 "additionalParamForVnf": vnf_param_json
140 key = vnf_param_json[i - 1]["vnfProfileId"]
150 def build_pnfs(input_data):
151 return json.JSONDecoder().decode(ignore_case_get(input_data, "object_additionalParamForPnf"))
154 def build_TaskSet(input_data):
155 vls = build_vls(input_data)
156 vnfs = build_vnfs(input_data)
157 pnfs = build_pnfs(input_data)
158 task_set = dict(dict(vls, **vnfs), **pnfs)
162 def post_deal(ns_inst_id, status):
163 uri = "api/nslcm/v1/ns/{nsInstanceId}/postdeal".format(nsInstanceId=ns_inst_id)
164 data = json.JSONEncoder().encode({
168 ret = restcall.req_by_msb(uri, "POST", data)
170 logger.error("Failed to call post_deal(%s): %s", ns_inst_id, ret[1])
171 logger.debug("Call post_deal(%s, %s) successfully.", ns_inst_id, status)
174 def update_job(job_id, progress, errcode, desc):
175 logger.debug("job_id %s" % job_id)
176 uri = "api/nslcm/v1/jobs/{jobId}".format(jobId=job_id)
177 data = json.JSONEncoder().encode({
178 "progress": progress,
182 ret = restcall.req_by_msb(uri, "POST", data)