Fix vfc-catalog/workflows pep8 issue
[vfc/nfvo/lcm.git] / lcm / workflows / build_in.py
1 # Copyright 2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import traceback
17 from threading import Thread
18 import time
19
20 from lcm.pub.utils.syscomm import fun_name
21 from lcm.pub.utils.values import ignore_case_get
22 from lcm.pub.utils import restcall
23 from lcm.pub.exceptions import NSLCMException
24
25 logger = logging.getLogger(__name__)
26
27 RESULT_OK, RESULT_NG = "0", "1"
28 JOB_ERROR = 255
29
30 g_jobs_status = {}
31
32 """
33 format of input_data
34 {
35     "jobId": uuid of job, 
36     "nsInstanceId": id of ns instance,
37     "object_context": json format of nsd,
38     "object_additionalParamForNs": json format of additional parameters for ns,
39     "object_additionalParamForVnf": json format of additional parameters for vnf,
40     "vlCount": int type of VL count,
41     "vnfCount: int type of VNF count,
42     "sfcCount": int type of SFC count, 
43     "sdnControllerId": uuid of SDN controller
44 }
45 """
46
47
48 def run_ns_instantiate(input_data):
49     logger.debug("Enter %s, input_data is %s", fun_name(), input_data)
50     ns_instantiate_ok = False
51     job_id = ignore_case_get(input_data, "jobId")
52     ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
53     nsd_json = ignore_case_get(input_data, "object_context")
54     ns_param_json = ignore_case_get(input_data, "object_additionalParamForNs")
55     vnf_param_json = ignore_case_get(input_data, "object_additionalParamForVnf")
56     vl_count = ignore_case_get(input_data, "vlCount")
57     vnf_count = ignore_case_get(input_data, "vnfCount")
58     sfc_count = ignore_case_get(input_data, "sfcCount")
59     sdnc_id = ignore_case_get(input_data, "sdnControllerId")
60     g_jobs_status[job_id] = [1 for i in range(vnf_count)]
61     try:
62         update_job(job_id, 10, "0", "Start to create VL")
63         for i in range(vl_count):
64             create_vl(ns_inst_id, i + 1, nsd_json, ns_param_json)
65
66         update_job(job_id, 30, "0", "Start to create VNF")
67         jobs = [create_vnf(ns_inst_id, i + 1, vnf_param_json) for i in range(vnf_count)] 
68         wait_until_jobs_done(job_id, jobs)
69
70         [confirm_vnf_status(inst_id) for inst_id, _, _ in jobs]
71
72         update_job(job_id, 70, "0", "Start to create SFC")
73         g_jobs_status[job_id] = [1 for i in range(sfc_count)]
74         jobs = [create_sfc(ns_inst_id, i + 1, nsd_json, sdnc_id) for i in range(sfc_count)] 
75         wait_until_jobs_done(job_id, jobs)
76
77         [confirm_sfc_status(inst_id) for inst_id, _, _ in jobs]
78
79         update_job(job_id, 90, "0", "Start to post deal")
80         post_deal(ns_inst_id, "true")
81
82         update_job(job_id, 100, "0", "Create NS successfully.")
83         ns_instantiate_ok = True
84     except NSLCMException as e:
85         logger.error("Failded to Create NS: %s", e.message)
86         update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
87         post_deal(ns_inst_id, "false")
88     except:
89         logger.error(traceback.format_exc())
90         update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
91         post_deal(ns_inst_id, "false")
92     finally:
93         g_jobs_status.pop(job_id)
94     return ns_instantiate_ok
95
96
97 def create_vl(ns_inst_id, vl_index, nsd, ns_param):
98     uri = "api/nslcm/v1/ns/vls"
99     data = json.JSONEncoder().encode({
100         "nsInstanceId": ns_inst_id,
101         "vlIndex": vl_index,
102         "context": nsd,
103         "additionalParamForNs": ns_param
104     })
105
106     ret = restcall.req_by_msb(uri, "POST", data)
107     if ret[0] != 0:
108         logger.error("Failed to call create_vl(%s): %s", vl_index, ret[1])
109         raise NSLCMException("Failed to call create_vl(index is %s)" % vl_index)
110     ret[1] = json.JSONDecoder().decode(ret[1])
111
112     result = str(ret[1]["result"])
113     detail = ret[1]["detail"]
114     vl_id = ret[1]["vlId"]
115     if result != RESULT_OK:
116         logger.error("Failed to create VL(%s): %s", vl_id, detail)
117         raise NSLCMException("Failed to create VL(%s)" % vl_id)
118
119     logger.debug("Create VL(%s) successfully.", vl_id)
120
121
122 def create_vnf(ns_inst_id, vnf_index, nf_param):
123     uri = "api/nslcm/v1/ns/vnfs"
124     data = json.JSONEncoder().encode({
125         "nsInstanceId": ns_inst_id,
126         "vnfIndex": vnf_index,
127         "additionalParamForVnf": nf_param
128     })
129
130     ret = restcall.req_by_msb(uri, "POST", data)
131     if ret[0] != 0:
132         logger.error("Failed to call create_vnf(%s): %s", vnf_index, ret[1])
133         raise NSLCMException("Failed to call create_vnf(index is %s)" % vnf_index)
134     ret[1] = json.JSONDecoder().decode(ret[1])
135
136     vnf_inst_id = ret[1]["vnfInstId"]
137     job_id = ret[1]["jobId"]
138     logger.debug("Create VNF(%s) started.", vnf_inst_id)
139     return vnf_inst_id, job_id, vnf_index - 1
140
141
142 def create_sfc(ns_inst_id, fp_index, nsd_json, sdnc_id):
143     uri = "api/nslcm/v1/ns/sfcs"
144     data = json.JSONEncoder().encode({
145         "nsInstanceId": ns_inst_id,
146         "context": nsd_json,
147         "fpindex": fp_index,
148         "sdnControllerId": sdnc_id
149     })
150
151     ret = restcall.req_by_msb(uri, "POST", data)
152     if ret[0] != 0:
153         logger.error("Failed to call create_sfc(%s): %s", fp_index, ret[1])
154         raise NSLCMException("Failed to call create_sfc(index is %s)" % fp_index)
155     ret[1] = json.JSONDecoder().decode(ret[1])
156
157     sfc_inst_id = ret[1]["sfcInstId"]
158     job_id = ret[1]["jobId"]
159     logger.debug("Create SFC(%s) started.", sfc_inst_id)
160     return sfc_inst_id, job_id, fp_index - 1
161
162
163 def post_deal(ns_inst_id, status):
164     uri = "api/nslcm/v1/ns/{nsInstanceId}/postdeal".format(nsInstanceId=ns_inst_id) 
165     data = json.JSONEncoder().encode({
166         "status": status
167     })
168
169     ret = restcall.req_by_msb(uri, "POST", data)
170     if ret[0] != 0:
171         logger.error("Failed to call post_deal(%s): %s", ns_inst_id, ret[1])
172     logger.debug("Call post_deal(%s, %s) successfully.", ns_inst_id, status)
173
174
175 def update_job(job_id, progress, errcode, desc):
176     uri = "api/nslcm/v1/jobs/{jobId}".format(jobId=job_id)
177     data = json.JSONEncoder().encode({
178         "progress": progress,
179         "errcode": errcode,
180         "desc": desc
181     })
182     restcall.req_by_msb(uri, "POST", data)  
183
184
185 class JobWaitThread(Thread):
186     """
187     Job Wait 
188     """
189     def __init__(self, inst_id, job_id, ns_job_id, index):
190         Thread.__init__(self)
191         self.inst_id = inst_id
192         self.job_id = job_id
193         self.ns_job_id = ns_job_id
194         self.index = index
195         self.retry_count = 60
196         self.interval_second = 3
197
198     def run(self):
199         count = 0
200         response_id, new_response_id = 0, 0
201         job_end_normal, job_timeout = False, True
202         while count < self.retry_count:
203             count = count + 1
204             time.sleep(self.interval_second)
205             uri = "/api/nslcm/v1/jobs/%s?responseId=%s" % (self.job_id, response_id)
206             ret = restcall.req_by_msb(uri, "GET")
207             if ret[0] != 0:
208                 logger.error("Failed to query job: %s:%s", ret[2], ret[1])
209                 continue
210             job_result = json.JSONDecoder().decode(ret[1])
211             if "responseDescriptor" not in job_result:
212                 logger.error("Job(%s) does not exist.", self.job_id)
213                 continue
214             progress = job_result["responseDescriptor"]["progress"]
215             new_response_id = job_result["responseDescriptor"]["responseId"]
216             job_desc = job_result["responseDescriptor"]["statusDescription"]
217             if new_response_id != response_id:
218                 logger.debug("%s:%s:%s", progress, new_response_id, job_desc)
219                 response_id = new_response_id
220                 count = 0
221             if progress == JOB_ERROR:
222                 job_timeout = False
223                 logger.error("Job(%s) failed: %s", self.job_id, job_desc)
224                 break
225             elif progress == 100:
226                 job_end_normal, job_timeout = True, False
227                 logger.info("Job(%s) ended normally", self.job_id)
228                 break
229         if job_timeout:
230             logger.error("Job(%s) timeout", self.job_id)
231         if self.ns_job_id in g_jobs_status:
232             if job_end_normal:
233                 g_jobs_status[self.ns_job_id][self.index] = 0
234
235
236 def wait_until_jobs_done(g_job_id, jobs):
237     job_threads = []
238     for inst_id, job_id, index in jobs:
239         job_threads.append(JobWaitThread(inst_id, job_id, g_job_id, index))
240     for t in job_threads:
241         t.start()
242     for t in job_threads:
243         t.join()
244     if g_job_id in g_jobs_status:
245         if sum(g_jobs_status[g_job_id]) > 0:
246             logger.error("g_jobs_status[%s]: %s", g_job_id, g_jobs_status[g_job_id])
247             raise NSLCMException("Some jobs failed!")
248
249
250 def confirm_vnf_status(vnf_inst_id):
251     uri = "api/nslcm/v1/ns/vnfs/{vnfInstId}".format(vnfInstId=vnf_inst_id)
252     ret = restcall.req_by_msb(uri, "GET")
253     if ret[0] != 0:
254         raise NSLCMException("Failed to call get_vnf(%s)" % vnf_inst_id)
255     ret[1] = json.JSONDecoder().decode(ret[1])
256
257     vnf_status = ret[1]["vnfStatus"]
258     if vnf_status != "active":
259         raise NSLCMException("Status of VNF(%s) is not active" % vnf_inst_id)
260
261
262 def confirm_sfc_status(sfc_inst_id):
263     uri = "api/nslcm/v1/ns/sfcs/{sfcInstId}".format(sfcInstId=sfc_inst_id)
264     ret = restcall.req_by_msb(uri, "GET")
265     if ret[0] != 0:
266         raise NSLCMException("Failed to call get_sfc(%s)" % sfc_inst_id)
267     ret[1] = json.JSONDecoder().decode(ret[1])
268
269     sfc_status = ret[1]["sfcStatus"]
270     if sfc_status != "active":
271         raise NSLCMException("Status of SFC(%s) is not active" % sfc_inst_id)