1 # Copyright 2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from lcm.jobs.const import JOB_INSTANCE_RESPONSE_ID_URI
21 from lcm.pub.exceptions import NSLCMException
22 from lcm.pub.utils.jobutil import JobUtil
23 from lcm.jobs.enum import JOB_TYPE, JOB_PROGRESS, JOB_ACTION
24 from lcm.pub.utils.values import ignore_case_get
25 from lcm.pub.utils.restcall import req_by_msb
28 logger = logging.getLogger(__name__)
31 class VerifyVnfs(threading.Thread):
32 def __init__(self, data, job_id):
33 super(VerifyVnfs, self).__init__()
37 self.verify_ok = False
38 self.verify_config = ''
42 self.verify_config = self.load_config()
43 JobUtil.create_job(JOB_TYPE.VNF, JOB_ACTION.CREATE, self.job_id, 'vnfsdk', self.job_id)
48 except NSLCMException as e:
49 self.update_job(JOB_PROGRESS.ERROR, e.args[0])
51 logger.error(traceback.format_exc())
52 self.update_job(JOB_PROGRESS.ERROR, 'Unknown error in vnf verify.')
54 logger.warn("Ignore terminate vnf operation")
56 self.update_job(100, "Ignore terminate vnf operation.")
58 def do_on_boarding(self):
59 self.update_job(10, "Start vnf on boarding.")
61 "csarId": self.data["PackageID"],
62 "labVimId": ignore_case_get(self.verify_config, "labVimId")
64 ret = req_by_msb("/api/nslcm/v1/vnfpackage", "POST", json.JSONEncoder().encode(onboarding_data))
66 raise NSLCMException("Failed to call vnf onboarding: %s" % ret[1])
67 rsp_data = json.JSONDecoder().decode(ret[1])
68 if not self.wait_until_job_done(rsp_data["jobId"], 15):
69 raise NSLCMException("Vnf onboarding failed")
70 self.update_job(20, "Vnf on boarding success.")
72 def do_inst_vnf(self):
73 self.update_job(30, "Start inst vnf.")
74 vnf_param = ignore_case_get(self.verify_config, "additionalParamForVnf")
75 if vnf_param and "additionalParam" in vnf_param[0]:
76 vnf_param[0]["additionalParam"]["vimId"] = ignore_case_get(self.verify_config, "lab_vim_id")
79 "additionalParamForVnf": vnf_param,
82 ret = req_by_msb("/api/nslcm/v1/ns/ns_vnfs", "POST", json.JSONEncoder().encode(inst_data))
84 raise NSLCMException("Failed to call inst vnf: %s" % ret[1])
85 rsp_data = json.JSONDecoder().decode(ret[1])
86 self.vnf_inst_id = rsp_data["vnfInstId"]
87 if not self.wait_until_job_done(rsp_data["jobId"], 40):
88 raise NSLCMException("Vnf(%s) inst failed" % self.vnf_inst_id)
89 self.update_job(50, "Inst vnf success.")
91 def do_func_test(self):
92 self.update_job(60, "Start vnf function test.")
93 func_data = {"PackageID": self.data["PackageID"]}
94 ret = req_by_msb("/openapi/vnfsdk/v1/functest/taskmanager/testtasks", "POST", json.JSONEncoder().encode(func_data))
96 raise NSLCMException("Failed to call func test: %s" % ret[1])
97 rsp_data = json.JSONDecoder().decode(ret[1])
99 if not self.wait_func_test_job_done(rsp_data["TaskID"], 40):
100 raise NSLCMException("Func test failed")
101 logger.info("Query(%s) job success.", rsp_data["TaskID"])
103 ret = req_by_msb("/openapi/vnfsdk/v1/functest/taskmanager/testtasks/%s/result" % rsp_data["TaskID"], "GET")
105 raise NSLCMException("Failed to get func test result: %s" % ret[1])
106 rsp_result_data = json.JSONDecoder().decode(ret[1])
107 logger.info("Func test(%s) result: %s", rsp_result_data)
108 self.update_job(80, "Vnf function test success.")
110 def do_term_vnf(self):
111 if not self.vnf_inst_id:
113 self.update_job(90, "Start term vnf.")
115 "terminationType": "forceful",
116 "gracefulTerminationTimeout": "600"
118 ret = req_by_msb("/api/nslcm/v1/ns/ns_vnfs/%s" % self.vnf_inst_id, "POST", json.JSONEncoder().encode(term_data))
120 raise NSLCMException("Failed to call term vnf: %s" % ret[1])
121 rsp_data = json.JSONDecoder().decode(ret[1])
122 end_progress = 100 if self.verify_ok else JOB_PROGRESS.ERROR
123 term_progress = 95 if self.verify_ok else JOB_PROGRESS.ERROR
124 if not self.wait_until_job_done(rsp_data["jobId"], term_progress):
125 logger.error("Vnf(%s) term failed", self.vnf_inst_id)
126 end_progress = JOB_PROGRESS.ERROR
127 self.update_job(end_progress, "Term vnf end.")
129 def update_job(self, progress, desc=''):
130 JobUtil.add_job_status(self.job_id, progress, desc)
132 def wait_until_job_done(self, job_id, global_progress, retry_count=60, interval_second=3):
134 response_id, new_response_id = 0, 0
135 job_end_normal, job_timeout = False, True
136 while count < retry_count:
138 time.sleep(interval_second)
139 ret = req_by_msb(JOB_INSTANCE_RESPONSE_ID_URI % (job_id, response_id), "GET")
141 logger.error("Failed to query job: %s:%s", ret[2], ret[1])
143 job_result = json.JSONDecoder().decode(ret[1])
144 if "responseDescriptor" not in job_result:
145 logger.error("Job(%s) does not exist.", job_id)
147 progress = job_result["responseDescriptor"]["progress"]
148 new_response_id = job_result["responseDescriptor"]["responseId"]
149 job_desc = job_result["responseDescriptor"]["statusDescription"]
150 if new_response_id != response_id:
151 self.update_job(global_progress, job_desc)
152 logger.debug("%s:%s:%s", progress, new_response_id, job_desc)
153 response_id = new_response_id
155 if progress == JOB_PROGRESS.ERROR:
156 if 'already onBoarded' in job_desc:
157 logger.warn("%s:%s", job_id, job_desc)
158 job_end_normal, job_timeout = True, False
159 logger.info("Job(%s) ended", job_id)
162 logger.error("Job(%s) failed: %s", job_id, job_desc)
164 elif progress == 100:
165 job_end_normal, job_timeout = True, False
166 logger.info("Job(%s) ended normally", job_id)
169 logger.error("Job(%s) timeout", job_id)
170 return job_end_normal
172 def wait_func_test_job_done(self, job_id, global_progress, retry_count=60, interval_second=3):
174 response_id, new_response_id = 0, 0
175 job_end_normal, job_timeout = False, True
176 while count < retry_count:
178 time.sleep(interval_second)
179 ret = req_by_msb("/openapi/vnfsdk/v1/functest/taskmanager/testtasks/%s?responseId=%s"
180 % (job_id, response_id), "GET")
182 logger.error("Failed to query job: %s:%s", ret[2], ret[1])
184 job_result = json.JSONDecoder().decode(ret[1])
185 if "responseDescriptor" not in job_result:
186 logger.error("Job(%s) does not exist.", job_id)
188 progress = job_result["responseDescriptor"]["progress"]
189 new_response_id = job_result["responseDescriptor"]["responseId"]
190 job_desc = job_result["responseDescriptor"]["statusDescription"]
191 if new_response_id != response_id:
192 self.update_job(global_progress, job_desc)
193 logger.debug("%s:%s:%s", progress, new_response_id, job_desc)
194 response_id = new_response_id
196 if progress == JOB_PROGRESS.ERROR:
197 if 'already onBoarded' in job_desc:
198 logger.warn("%s:%s", job_id, job_desc)
199 job_end_normal, job_timeout = True, False
200 logger.info("Job(%s) ended", job_id)
203 logger.error("Job(%s) failed: %s", job_id, job_desc)
205 elif progress == 100:
206 job_end_normal, job_timeout = True, False
207 logger.info("Job(%s) ended normally", job_id)
210 logger.error("Job(%s) timeout", job_id)
211 return job_end_normal
213 def load_config(self):
214 json_file = os.path.join(os.path.dirname(__file__), '../biz/verify_vnfs_config.json')
216 json_data = json.JSONDecoder().decode(f.read())