1 # Copyright 2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
16 from threading import Thread
19 from lcm.pub.utils.syscomm import fun_name
20 from lcm.pub.utils.values import ignore_case_get
21 from lcm.pub.utils import restcall
22 from lcm.pub.exceptions import NSLCMException
24 logger = logging.getLogger(__name__)
26 RESULT_OK, RESULT_NG = "0", "1"
35 "nsInstanceId": id of ns instance,
36 "object_context": json format of nsd,
37 "object_additionalParamForNs": json format of additional parameters for ns,
38 "object_additionalParamForVnf": json format of additional parameters for vnf,
39 "vlCount": int type of VL count,
40 "vnfCount: int type of VNF count,
41 "sfcCount": int type of SFC count,
42 "sdnControllerId": uuid of SDN controller
45 def run_ns_instantiate(input_data):
46 logger.debug("Enter %s, input_data is %s", fun_name(), input_data)
47 job_id = ignore_case_get(input_data, "jobId")
48 ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
49 nsd_json = ignore_case_get(input_data, "object_context")
50 ns_param_json = ignore_case_get(input_data, "object_additionalParamForNs")
51 vnf_param_json = ignore_case_get(input_data, "object_additionalParamForVnf")
52 vl_count = ignore_case_get(input_data, "vlCount")
53 vnf_count = ignore_case_get(input_data, "vnfCount")
54 sfc_count = ignore_case_get(input_data, "sfcCount")
55 sdnc_id = ignore_case_get(input_data, "sdnControllerId")
56 g_jobs_status[job_id] = [1 for i in range(vnf_count)]
58 update_job(job_id, 10, "0", "Start to create VL")
59 for i in range(vl_count):
60 create_vl(ns_inst_id, i + 1, nsd_json, ns_param_json)
62 update_job(job_id, 30, "0", "Start to create VNF")
63 jobs = [create_vnf(ns_inst_id, i + 1, vnf_param_json) for i in range(vnf_count)]
64 wait_until_jobs_done(job_id, jobs)
66 update_job(job_id, 70, "0", "Start to create SFC")
67 jobs = [create_sfc(ns_inst_id, i + 1, nsd_json, sdnc_id) for i in range(sfc_count)]
68 wait_until_jobs_done(job_id, jobs)
70 update_job(job_id, 90, "0", "Start to post deal")
71 post_deal(ns_inst_id, "true")
73 update_job(job_id, 100, "0", "Create NS successfully.")
74 except NSLCMException as e:
75 logger.error("Failded to Create NS: %s", e.message)
76 update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
77 post_deal(ns_inst_id, "false")
79 logger.error(traceback.format_exc())
80 update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
81 post_deal(ns_inst_id, "false")
83 g_jobs_status.pop(job_id)
86 def create_vl(ns_inst_id, vl_index, nsd, ns_param):
87 uri = "api/nslcm/v1/ns/vls"
88 data = json.JSONEncoder().encode({
89 "nsInstanceId": ns_inst_id,
92 "additionalParamForNs": ns_param
95 ret = restcall.req_by_msb(uri, "POST", data)
97 logger.error("Failed to call create_vl(%s): %s", vl_index, ret[1])
98 raise NSLCMException("Failed to call create_vl(index is %s)" % vl_index)
100 result = str(ret[1]["result"])
101 detail = ret[1]["detail"]
102 vl_id = ret[1]["vlId"]
103 if result != RESULT_OK:
104 logger.error("Failed to create VL(%s): %s", vl_id, detail)
105 raise NSLCMException("Failed to create VL(%s)" % vl_id)
107 logger.debug("Create VL(%s) successfully.", vl_id)
109 def create_vnf(ns_inst_id, vnf_index, nf_param):
111 data = json.JSONEncoder().encode({
112 "nsInstanceId": ns_inst_id,
113 "vnfIndex": vnf_index,
114 "additionalParamForVnf": nf_param
117 ret = restcall.req_by_msb(uri, "POST", data)
119 logger.error("Failed to call create_vnf(%s): %s", vnf_index, ret[1])
120 raise NSLCMException("Failed to call create_vnf(index is %s)" % vnf_index)
122 vnf_inst_id = ret[1]["vnfInstId"]
123 job_id = ret[1]["jobId"]
124 logger.debug("Create VNF(%s) started.", vnf_inst_id)
125 return vnf_inst_id, job_id, vnf_index - 1
127 def create_sfc(ns_inst_id, fp_index, nsd_json, sdnc_id):
129 data = json.JSONEncoder().encode({
130 "nsInstanceId": ns_inst_id,
133 "sdnControllerId": sdnc_id
136 ret = restcall.req_by_msb(uri, "POST", data)
138 logger.error("Failed to call create_sfc(%s): %s", fp_index, ret[1])
139 raise NSLCMException("Failed to call create_sfc(index is %s)" % fp_index)
141 sfc_inst_id = ret[1]["sfcInstId"]
142 job_id = ret[1]["jobId"]
143 logger.debug("Create SFC(%s) started.", sfc_inst_id)
144 return sfc_inst_id, job_id, fp_index - 1
146 def post_deal(ns_inst_id, status):
147 uri = "/ns/{nsInstanceId}/postdeal".format(nsInstanceId=ns_inst_id)
148 data = json.JSONEncoder().encode({
152 ret = restcall.req_by_msb(uri, "POST", data)
154 logger.error("Failed to call post_deal(%s): %s", ns_inst_id, ret[1])
155 logger.debug("Call post_deal(%s) successfully.", ns_inst_id)
157 def update_job(job_id, progress, errcode, desc):
158 uri = "api/nslcm/v1/jobs/{jobId}".format(jobId=job_id)
159 data = json.JSONEncoder().encode({
160 "progress": progress,
164 restcall.req_by_msb(uri, "POST", data)
166 class JobWaitThread(Thread):
171 def __init__(self, inst_id, job_id, index):
172 Thread.__init__(self)
173 self.inst_id = inst_id
176 self.retry_count = 60
177 self.interval_second = 3
181 response_id, new_response_id = 0, 0
182 job_end_normal, job_timeout = False, True
183 while count < self.retry_count:
185 time.sleep(self.interval_second)
186 uri = "/api/nslcm/v1/jobs/%s?responseId=%s" % (self.job_id, response_id)
187 ret = restcall.req_by_msb(uri, "GET")
189 logger.error("Failed to query job: %s:%s", ret[2], ret[1])
191 job_result = json.JSONDecoder().decode(ret[1])
192 if "responseDescriptor" not in job_result:
193 logger.error("Job(%s) does not exist.", self.job_id)
195 progress = job_result["responseDescriptor"]["progress"]
196 new_response_id = job_result["responseDescriptor"]["responseId"]
197 job_desc = job_result["responseDescriptor"]["statusDescription"]
198 if new_response_id != response_id:
199 logger.debug("%s:%s:%s", progress, new_response_id, job_desc)
200 response_id = new_response_id
202 if progress == JOB_ERROR:
204 logger.error("Job(%s) failed: %s", self.job_id, job_desc)
206 elif progress == 100:
207 job_end_normal, job_timeout = True, False
208 logger.info("Job(%s) ended normally", self.job_id)
211 logger.error("Job(%s) timeout", self.job_id)
212 if self.job_id in g_jobs_status:
214 g_jobs_status[self.job_id][self.index] = 0
216 def wait_until_jobs_done(g_job_id, jobs):
218 for inst_id, job_id, index in jobs:
219 job_threads.append(JobWaitThread(inst_id, job_id, index))
220 for t in job_threads:
222 for t in job_threads:
224 if g_job_id in g_jobs_status:
225 if sum(g_jobs_status[g_job_id]) > 0:
226 raise NSLCMException("Some jobs failed!")