Update python2 to python3
[vfc/nfvo/lcm.git] / lcm / ns_vnfs / biz / verify_vnfs.py
1 # Copyright 2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import os
17 import threading
18 import traceback
19 import time
20
21 from lcm.pub.exceptions import NSLCMException
22 from lcm.pub.utils.jobutil import JobUtil
23 from lcm.pub.enum import JOB_TYPE, JOB_PROGRESS
24 from lcm.pub.utils.values import ignore_case_get
25 from lcm.pub.utils.restcall import req_by_msb
26
27
28 logger = logging.getLogger(__name__)
29
30
31 class VerifyVnfs(threading.Thread):
32     def __init__(self, data, job_id):
33         super(VerifyVnfs, self).__init__()
34         self.data = data
35         self.job_id = job_id
36         self.vnf_inst_id = ''
37         self.verify_ok = False
38         self.verify_config = ''
39
40     def run(self):
41         try:
42             self.verify_config = self.load_config()
43             JobUtil.create_job("VNF", JOB_TYPE.CREATE_VNF, self.job_id, 'vnfsdk', self.job_id)
44             self.do_on_boarding()
45             self.do_inst_vnf()
46             self.do_func_test()
47             self.verify_ok = True
48         except NSLCMException as e:
49             self.update_job(JOB_PROGRESS.ERROR, e.args[0])
50         except:
51             logger.error(traceback.format_exc())
52             self.update_job(JOB_PROGRESS.ERROR, 'Unknown error in vnf verify.')
53         finally:
54             logger.warn("Ignore terminate vnf operation")
55             if self.verify_ok:
56                 self.update_job(100, "Ignore terminate vnf operation.")
57
58     def do_on_boarding(self):
59         self.update_job(10, "Start vnf on boarding.")
60         onboarding_data = {
61             "csarId": self.data["PackageID"],
62             "labVimId": ignore_case_get(self.verify_config, "labVimId")
63         }
64         ret = req_by_msb("/api/nslcm/v1/vnfpackage", "POST", json.JSONEncoder().encode(onboarding_data))
65         if ret[0] != 0:
66             raise NSLCMException("Failed to call vnf onboarding: %s" % ret[1])
67         rsp_data = json.JSONDecoder().decode(ret[1])
68         if not self.wait_until_job_done(rsp_data["jobId"], 15):
69             raise NSLCMException("Vnf onboarding failed")
70         self.update_job(20, "Vnf on boarding success.")
71
72     def do_inst_vnf(self):
73         self.update_job(30, "Start inst vnf.")
74         vnf_param = ignore_case_get(self.verify_config, "additionalParamForVnf")
75         if vnf_param and "additionalParam" in vnf_param[0]:
76             vnf_param[0]["additionalParam"]["vimId"] = ignore_case_get(self.verify_config, "lab_vim_id")
77         inst_data = {
78             "nsInstanceId": "",
79             "additionalParamForVnf": vnf_param,
80             "vnfIndex": "1"
81         }
82         ret = req_by_msb("/api/nslcm/v1/ns/ns_vnfs", "POST", json.JSONEncoder().encode(inst_data))
83         if ret[0] != 0:
84             raise NSLCMException("Failed to call inst vnf: %s" % ret[1])
85         rsp_data = json.JSONDecoder().decode(ret[1])
86         self.vnf_inst_id = rsp_data["vnfInstId"]
87         if not self.wait_until_job_done(rsp_data["jobId"], 40):
88             raise NSLCMException("Vnf(%s) inst failed" % self.vnf_inst_id)
89         self.update_job(50, "Inst vnf success.")
90
91     def do_func_test(self):
92         self.update_job(60, "Start vnf function test.")
93         func_data = {"PackageID": self.data["PackageID"]}
94         ret = req_by_msb("/openapi/vnfsdk/v1/functest/taskmanager/testtasks", "POST", json.JSONEncoder().encode(func_data))
95         if ret[0] != 0:
96             raise NSLCMException("Failed to call func test: %s" % ret[1])
97         rsp_data = json.JSONDecoder().decode(ret[1])
98
99         if not self.wait_func_test_job_done(rsp_data["TaskID"], 40):
100             raise NSLCMException("Func test failed")
101         logger.info("Query(%s) job success.", rsp_data["TaskID"])
102
103         ret = req_by_msb("/openapi/vnfsdk/v1/functest/taskmanager/testtasks/%s/result" % rsp_data["TaskID"], "GET")
104         if ret[0] != 0:
105             raise NSLCMException("Failed to get func test result: %s" % ret[1])
106         rsp_result_data = json.JSONDecoder().decode(ret[1])
107         logger.info("Func test(%s) result: %s", rsp_result_data)
108         self.update_job(80, "Vnf function test success.")
109
110     def do_term_vnf(self):
111         if not self.vnf_inst_id:
112             return
113         self.update_job(90, "Start term vnf.")
114         term_data = {
115             "terminationType": "forceful",
116             "gracefulTerminationTimeout": "600"
117         }
118         ret = req_by_msb("/api/nslcm/v1/ns/ns_vnfs/%s" % self.vnf_inst_id, "POST", json.JSONEncoder().encode(term_data))
119         if ret[0] != 0:
120             raise NSLCMException("Failed to call term vnf: %s" % ret[1])
121         rsp_data = json.JSONDecoder().decode(ret[1])
122         end_progress = 100 if self.verify_ok else JOB_PROGRESS.ERROR
123         term_progress = 95 if self.verify_ok else JOB_PROGRESS.ERROR
124         if not self.wait_until_job_done(rsp_data["jobId"], term_progress):
125             logger.error("Vnf(%s) term failed", self.vnf_inst_id)
126             end_progress = JOB_PROGRESS.ERROR
127         self.update_job(end_progress, "Term vnf end.")
128
129     def update_job(self, progress, desc=''):
130         JobUtil.add_job_status(self.job_id, progress, desc)
131
132     def wait_until_job_done(self, job_id, global_progress, retry_count=60, interval_second=3):
133         count = 0
134         response_id, new_response_id = 0, 0
135         job_end_normal, job_timeout = False, True
136         while count < retry_count:
137             count = count + 1
138             time.sleep(interval_second)
139             ret = req_by_msb("/api/nslcm/v1/jobs/%s?responseId=%s" % (job_id, response_id), "GET")
140             if ret[0] != 0:
141                 logger.error("Failed to query job: %s:%s", ret[2], ret[1])
142                 continue
143             job_result = json.JSONDecoder().decode(ret[1])
144             if "responseDescriptor" not in job_result:
145                 logger.error("Job(%s) does not exist.", job_id)
146                 continue
147             progress = job_result["responseDescriptor"]["progress"]
148             new_response_id = job_result["responseDescriptor"]["responseId"]
149             job_desc = job_result["responseDescriptor"]["statusDescription"]
150             if new_response_id != response_id:
151                 self.update_job(global_progress, job_desc)
152                 logger.debug("%s:%s:%s", progress, new_response_id, job_desc)
153                 response_id = new_response_id
154                 count = 0
155             if progress == JOB_PROGRESS.ERROR:
156                 if 'already onBoarded' in job_desc:
157                     logger.warn("%s:%s", job_id, job_desc)
158                     job_end_normal, job_timeout = True, False
159                     logger.info("Job(%s) ended", job_id)
160                     break
161                 job_timeout = False
162                 logger.error("Job(%s) failed: %s", job_id, job_desc)
163                 break
164             elif progress == 100:
165                 job_end_normal, job_timeout = True, False
166                 logger.info("Job(%s) ended normally", job_id)
167                 break
168         if job_timeout:
169             logger.error("Job(%s) timeout", job_id)
170         return job_end_normal
171
172     def wait_func_test_job_done(self, job_id, global_progress, retry_count=60, interval_second=3):
173         count = 0
174         response_id, new_response_id = 0, 0
175         job_end_normal, job_timeout = False, True
176         while count < retry_count:
177             count = count + 1
178             time.sleep(interval_second)
179             ret = req_by_msb("/openapi/vnfsdk/v1/functest/taskmanager/testtasks/%s?responseId=%s"
180                              % (job_id, response_id), "GET")
181             if ret[0] != 0:
182                 logger.error("Failed to query job: %s:%s", ret[2], ret[1])
183                 continue
184             job_result = json.JSONDecoder().decode(ret[1])
185             if "responseDescriptor" not in job_result:
186                 logger.error("Job(%s) does not exist.", job_id)
187                 continue
188             progress = job_result["responseDescriptor"]["progress"]
189             new_response_id = job_result["responseDescriptor"]["responseId"]
190             job_desc = job_result["responseDescriptor"]["statusDescription"]
191             if new_response_id != response_id:
192                 self.update_job(global_progress, job_desc)
193                 logger.debug("%s:%s:%s", progress, new_response_id, job_desc)
194                 response_id = new_response_id
195                 count = 0
196             if progress == JOB_PROGRESS.ERROR:
197                 if 'already onBoarded' in job_desc:
198                     logger.warn("%s:%s", job_id, job_desc)
199                     job_end_normal, job_timeout = True, False
200                     logger.info("Job(%s) ended", job_id)
201                     break
202                 job_timeout = False
203                 logger.error("Job(%s) failed: %s", job_id, job_desc)
204                 break
205             elif progress == 100:
206                 job_end_normal, job_timeout = True, False
207                 logger.info("Job(%s) ended normally", job_id)
208                 break
209         if job_timeout:
210             logger.error("Job(%s) timeout", job_id)
211         return job_end_normal
212
213     def load_config(self):
214         json_file = os.path.join(os.path.dirname(__file__), '../biz/verify_vnfs_config.json')
215         f = open(json_file)
216         json_data = json.JSONDecoder().decode(f.read())
217         f.close()
218         return json_data