800606053b066df7b62ed40dfc6fd8fb785c12a7
[vfc/nfvo/lcm.git] / lcm / workflows / build_in.py
1 # Copyright 2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import traceback
17 from threading import Thread
18 import time
19
20 from lcm.pub.utils.syscomm import fun_name
21 from lcm.pub.utils.values import ignore_case_get
22 from lcm.pub.utils import restcall
23 from lcm.pub.exceptions import NSLCMException
24 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
25
26 logger = logging.getLogger(__name__)
27
28 RESULT_OK, RESULT_NG = "0", "1"
29 JOB_ERROR = 255
30
31 g_jobs_status = {}
32
33 """
34 format of input_data
35 {
36     "jobId": uuid of job,
37     "nsInstanceId": id of ns instance,
38     "object_context": json format of nsd,
39     "object_additionalParamForNs": json format of additional parameters for ns,
40     "object_additionalParamForVnf": json format of additional parameters for vnf,
41     "vlCount": int type of VL count,
42     "vnfCount: int type of VNF count,
43     "sfcCount": int type of SFC count,
44     "sdnControllerId": uuid of SDN controller
45 }
46 """
47
48
49 def run_ns_instantiate(input_data, occ_id):
50     logger.debug("Enter %s, input_data is %s", fun_name(), input_data)
51     ns_instantiate_ok = False
52     job_id = ignore_case_get(input_data, "jobId")
53     ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
54     nsd_json = ignore_case_get(input_data, "object_context")
55     ns_param_json = ignore_case_get(input_data, "object_additionalParamForNs")
56     vnf_param_json = ignore_case_get(input_data, "object_additionalParamForVnf")
57     pnf_param_json = ignore_case_get(input_data, "object_additionalParamForPnf")
58     vl_count = int(ignore_case_get(input_data, "vlCount", 0))
59     vnf_count = int(ignore_case_get(input_data, "vnfCount", 0))
60     sfc_count = int(ignore_case_get(input_data, "sfcCount", 0))
61     sdnc_id = ignore_case_get(input_data, "sdnControllerId")
62     g_jobs_status[job_id] = [1 for i in range(vnf_count)]
63     try:
64         update_job(job_id, 10, "true", "Start to create VL")
65         for i in range(vl_count):
66             create_vl(ns_inst_id, i + 1, nsd_json, ns_param_json)
67
68         update_job(job_id, 30, "true", "Start to create VNF")
69         jobs = [create_vnf(ns_inst_id, i + 1, vnf_param_json) for i in range(vnf_count)]
70         wait_until_jobs_done(job_id, jobs)
71
72         [confirm_vnf_status(inst_id) for inst_id, _, _ in jobs]
73
74         update_job(job_id, 50, "true", "Start to create PNF")
75         create_pnf(pnf_param_json)
76
77         update_job(job_id, 70, "true", "Start to create SFC")
78         g_jobs_status[job_id] = [1 for i in range(sfc_count)]
79         jobs = [create_sfc(ns_inst_id, i + 1, nsd_json, sdnc_id) for i in range(sfc_count)]
80         wait_until_jobs_done(job_id, jobs)
81
82         [confirm_sfc_status(inst_id) for inst_id, _, _ in jobs]
83
84         update_job(job_id, 90, "true", "Start to post deal")
85         post_deal(ns_inst_id, "true")
86
87         update_job(job_id, 100, "true", "Create NS successfully.")
88         NsLcmOpOcc.update(occ_id, "COMPLETED")
89         ns_instantiate_ok = True
90     except NSLCMException as e:
91         logger.error("Failded to Create NS: %s", e.message)
92         update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
93         NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.message)
94         post_deal(ns_inst_id, "false")
95     except Exception as e:
96         logger.error(traceback.format_exc())
97         update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
98         NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.message)
99         post_deal(ns_inst_id, "false")
100     finally:
101         g_jobs_status.pop(job_id)
102     return ns_instantiate_ok
103
104
105 def create_vl(ns_inst_id, vl_index, nsd, ns_param):
106     uri = "api/nslcm/v1/ns/vls"
107     data = json.JSONEncoder().encode({
108         "nsInstanceId": ns_inst_id,
109         "vlIndex": vl_index,
110         "context": nsd,
111         "additionalParamForNs": ns_param
112     })
113
114     ret = restcall.req_by_msb(uri, "POST", data)
115     if ret[0] != 0:
116         logger.error("Failed to call create_vl(%s): %s", vl_index, ret[1])
117         raise NSLCMException("Failed to call create_vl(index is %s)" % vl_index)
118     ret[1] = json.JSONDecoder().decode(ret[1])
119
120     result = str(ret[1]["result"])
121     detail = ret[1]["detail"]
122     vl_id = ret[1]["vlId"]
123     if result != RESULT_OK:
124         logger.error("Failed to create VL(%s): %s", vl_id, detail)
125         raise NSLCMException("Failed to create VL(%s)" % vl_id)
126
127     logger.debug("Create VL(%s) successfully.", vl_id)
128
129
130 def create_vnf(ns_inst_id, vnf_index, nf_param):
131     uri = "api/nslcm/v1/ns/vnfs"
132     data = json.JSONEncoder().encode({
133         "nsInstanceId": ns_inst_id,
134         "vnfIndex": vnf_index,
135         "additionalParamForVnf": json.JSONDecoder().decode(nf_param)
136     })
137
138     ret = restcall.req_by_msb(uri, "POST", data)
139     if ret[0] != 0:
140         logger.error("Failed to call create_vnf(%s): %s", vnf_index, ret[1])
141         raise NSLCMException("Failed to call create_vnf(index is %s)" % vnf_index)
142     ret[1] = json.JSONDecoder().decode(ret[1])
143
144     vnf_inst_id = ret[1]["vnfInstId"]
145     job_id = ret[1]["jobId"]
146     logger.debug("Create VNF(%s) started.", vnf_inst_id)
147     return vnf_inst_id, job_id, vnf_index - 1
148
149
150 def create_sfc(ns_inst_id, fp_index, nsd_json, sdnc_id):
151     uri = "api/nslcm/v1/ns/sfcs"
152     data = json.JSONEncoder().encode({
153         "nsInstanceId": ns_inst_id,
154         "context": nsd_json,
155         "fpindex": fp_index,
156         "sdnControllerId": sdnc_id
157     })
158
159     ret = restcall.req_by_msb(uri, "POST", data)
160     if ret[0] != 0:
161         logger.error("Failed to call create_sfc(%s): %s", fp_index, ret[1])
162         raise NSLCMException("Failed to call create_sfc(index is %s)" % fp_index)
163     ret[1] = json.JSONDecoder().decode(ret[1])
164
165     sfc_inst_id = ret[1]["sfcInstId"]
166     job_id = ret[1]["jobId"]
167     logger.debug("Create SFC(%s) started.", sfc_inst_id)
168     return sfc_inst_id, job_id, fp_index - 1
169
170
171 def post_deal(ns_inst_id, status):
172     uri = "api/nslcm/v1/ns/{nsInstanceId}/postdeal".format(nsInstanceId=ns_inst_id)
173     data = json.JSONEncoder().encode({
174         "status": status
175     })
176
177     ret = restcall.req_by_msb(uri, "POST", data)
178     if ret[0] != 0:
179         logger.error("Failed to call post_deal(%s): %s", ns_inst_id, ret[1])
180     logger.debug("Call post_deal(%s, %s) successfully.", ns_inst_id, status)
181
182
183 def update_job(job_id, progress, errcode, desc):
184     uri = "api/nslcm/v1/jobs/{jobId}".format(jobId=job_id)
185     data = json.JSONEncoder().encode({
186         "progress": progress,
187         "errcode": errcode,
188         "desc": desc
189     })
190     restcall.req_by_msb(uri, "POST", data)
191
192
193 class JobWaitThread(Thread):
194     """
195     Job Wait
196     """
197
198     def __init__(self, inst_id, job_id, ns_job_id, index):
199         Thread.__init__(self)
200         self.inst_id = inst_id
201         self.job_id = job_id
202         self.ns_job_id = ns_job_id
203         self.index = index
204         self.retry_count = 600
205         self.interval_second = 3
206
207     def run(self):
208         count = 0
209         response_id, new_response_id = 0, 0
210         job_end_normal, job_timeout = False, True
211         while count < self.retry_count:
212             count = count + 1
213             time.sleep(self.interval_second)
214             uri = "/api/nslcm/v1/jobs/%s?responseId=%s" % (self.job_id, response_id)
215             ret = restcall.req_by_msb(uri, "GET")
216             if ret[0] != 0:
217                 logger.error("Failed to query job: %s:%s", ret[2], ret[1])
218                 continue
219             job_result = json.JSONDecoder().decode(ret[1])
220             if "responseDescriptor" not in job_result:
221                 logger.debug("No new progress after response_id(%s) in job(%s)", response_id, self.job_id)
222                 continue
223             progress = job_result["responseDescriptor"]["progress"]
224             new_response_id = job_result["responseDescriptor"]["responseId"]
225             job_desc = job_result["responseDescriptor"]["statusDescription"]
226             if new_response_id != response_id:
227                 logger.debug("%s:%s:%s", progress, new_response_id, job_desc)
228                 response_id = new_response_id
229                 count = 0
230             if progress == JOB_ERROR:
231                 job_timeout = False
232                 logger.error("Job(%s) failed: %s", self.job_id, job_desc)
233                 break
234             elif progress == 100:
235                 job_end_normal, job_timeout = True, False
236                 logger.info("Job(%s) ended normally", self.job_id)
237                 break
238         if job_timeout:
239             logger.error("Job(%s) timeout", self.job_id)
240         if self.ns_job_id in g_jobs_status:
241             if job_end_normal:
242                 g_jobs_status[self.ns_job_id][self.index] = 0
243
244
245 def wait_until_jobs_done(g_job_id, jobs):
246     job_threads = []
247     for inst_id, job_id, index in jobs:
248         job_threads.append(JobWaitThread(inst_id, job_id, g_job_id, index))
249     for t in job_threads:
250         t.start()
251     for t in job_threads:
252         t.join()
253     if g_job_id in g_jobs_status:
254         if sum(g_jobs_status[g_job_id]) > 0:
255             logger.error("g_jobs_status[%s]: %s", g_job_id, g_jobs_status[g_job_id])
256             raise NSLCMException("Some jobs failed!")
257
258
259 def confirm_vnf_status(vnf_inst_id):
260     uri = "api/nslcm/v1/ns/vnfs/{vnfInstId}".format(vnfInstId=vnf_inst_id)
261     ret = restcall.req_by_msb(uri, "GET")
262     if ret[0] != 0:
263         raise NSLCMException("Failed to call get_vnf(%s)" % vnf_inst_id)
264     ret[1] = json.JSONDecoder().decode(ret[1])
265
266     vnf_status = ret[1]["vnfStatus"]
267     if vnf_status != "active":
268         raise NSLCMException("Status of VNF(%s) is not active" % vnf_inst_id)
269
270
271 def confirm_sfc_status(sfc_inst_id):
272     uri = "api/nslcm/v1/ns/sfcs/{sfcInstId}".format(sfcInstId=sfc_inst_id)
273     ret = restcall.req_by_msb(uri, "GET")
274     if ret[0] != 0:
275         raise NSLCMException("Failed to call get_sfc(%s)" % sfc_inst_id)
276     ret[1] = json.JSONDecoder().decode(ret[1])
277
278     sfc_status = ret[1]["sfcStatus"]
279     if sfc_status != "active":
280         raise NSLCMException("Status of SFC(%s) is not active" % sfc_inst_id)
281
282
283 def create_pnf(pnf_param_json):
284     if pnf_param_json and len(pnf_param_json) > 0:
285         pnfs = json.JSONDecoder().decode(pnf_param_json)
286         for pnf in pnfs.itervalues():
287             uri = "/api/nslcm/v1/pnfs"
288             method = "POST"
289             content = json.JSONEncoder().encode(pnf["input"]["content"])
290             ret = restcall.req_by_msb(uri, method, content)
291             if ret[0] != 0:
292                 logger.error("Failed to call create_pnf(%s) result %s", content, ret)
293                 raise NSLCMException("Failed to call create_pnf(%s)" % content)