1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
39 logger = logging.getLogger(__name__)
42 class BuildInWorkflowThread(Thread):
43 def __init__(self, plan_input, occ_id):
45 self.plan_input = plan_input
49 build_in.run_ns_instantiate(self.plan_input, self.occ_id)
52 class InstantNSService(object):
53 def __init__(self, ns_inst_id, plan_content):
54 self.ns_inst_id = ns_inst_id
55 self.req_data = plan_content
58 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
59 occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
62 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
63 logger.debug('req_data=%s' % self.req_data)
64 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
67 # input_parameters = []
68 # if 'additionalParamForNs' in self.req_data:
69 # for key, val in self.req_data['additionalParamForNs'].items():
70 # input_parameters.append({"key": key, "value": val})
71 # if 'location' in self.req_data['additionalParamForNs']:
72 # vim_id = self.req_data['additionalParamForNs']['location']
73 # params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
75 # params_json = json.JSONEncoder().encode({})
77 if 'additionalParamForNs' in self.req_data:
78 for key, val in self.req_data['additionalParamForNs'].items():
79 input_parameters.append({"key": key, "value": val})
81 if 'location' in self.req_data['additionalParamForNs']:
82 cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
83 cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
84 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
85 logger.info('vim_id::::::::::::%s', type(vim_id))
86 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
88 params_json = json.JSONEncoder().encode({})
90 location_constraints = []
91 if 'locationConstraints' in self.req_data:
92 location_constraints = self.req_data['locationConstraints']
94 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
95 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
96 logger.debug('tosca plan dest: %s' % dst_plan)
97 logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
98 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
101 plan_dict = json.JSONDecoder().decode(dst_plan)
102 for vnf in ignore_case_get(plan_dict, "vnfs"):
103 vnfd_id = vnf['properties']['id']
104 vnfm_type_temp = vnf['properties'].get("vnfm_info", "undefined")
105 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type_temp)
106 vnfm_type = vnfm_type_temp
107 if isinstance(vnfm_type_temp, list):
108 vnfm_type = vnfm_type_temp[0]
109 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
110 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
113 "vnfProfileId": vnf["vnf_id"],
115 "properties": json.JSONEncoder().encode(vnf['properties']),
117 "vnfmInstanceId": vnfm_info["vnfmId"],
118 "vnfmType": vnfm_type,
119 "inputs": params_json
123 self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
124 dst_plan = json.JSONEncoder().encode(plan_dict)
125 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
126 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
128 pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
130 vnf_params_json = json.JSONEncoder().encode(params_vnf)
133 'nsInstanceId': self.ns_inst_id,
134 'object_context': dst_plan,
135 'object_additionalParamForNs': params_json,
136 'object_additionalParamForVnf': vnf_params_json,
137 'object_additionalParamForPnf': pnf_params_json
139 plan_input.update(**self.get_model_count(dst_plan))
141 if 'additionalParamForNs' in self.req_data:
142 plan_input["sdnControllerId"] = ignore_case_get(
143 self.req_data['additionalParamForNs'], "sdncontroller")
145 ServiceBaseInfoModel(service_id=self.ns_inst_id,
146 service_name=ns_inst.name,
148 description=ns_inst.description,
150 status=ns_inst.status,
152 create_time=int(time.time() * 1000)).save()
154 if config.WORKFLOW_OPTION == "wso2":
155 service_tpl = get_servicetemplate(ns_inst.nsd_id)
156 DefPkgMappingModel(service_id=self.ns_inst_id,
157 service_def_id=service_tpl['csarId'],
158 template_name=service_tpl['templateName'],
159 template_id=service_tpl['serviceTemplateId']).save()
161 for key, val in self.req_data['additionalParamForNs'].items():
162 InputParamMappingModel(service_id=self.ns_inst_id,
164 input_value=val).save()
166 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
167 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
168 vnffginstid=str(uuid.uuid4()),
169 nsinstid=self.ns_inst_id,
170 endpointnumber=0).save()
174 logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
175 if config.WORKFLOW_OPTION == "wso2":
176 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
177 elif config.WORKFLOW_OPTION == "activiti":
178 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
179 elif config.WORKFLOW_OPTION == "grapflow":
180 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
182 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
184 except Exception as e:
185 logger.error(traceback.format_exc())
186 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
187 NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.message)
188 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
189 return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
191 def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
193 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
194 process_id = get_process_id('init', servicetemplate_id)
195 data = {"processId": process_id, "params": {"planInput": plan_input}}
196 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
198 ret = workflow_run(data)
199 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
200 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
201 self.ns_inst_id, ret.get('status')))
202 if ret.get('status') == 1:
203 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
204 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
206 def start_activiti_workflow(self, job_id, plan_input, occ_id):
208 plans = WFPlanModel.objects.filter()
210 raise NSLCMException("No plan is found, you should deploy plan first!")
212 "processId": plans[0].process_id,
215 ret = activiti.exec_workflow(data)
216 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
217 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
218 self.ns_inst_id, ret.get('status')))
219 if ret.get('status') == 1:
220 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
221 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
223 def start_buildin_workflow(self, job_id, plan_input, occ_id):
224 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
225 BuildInWorkflowThread(plan_input, occ_id).start()
226 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
228 def start_buildin_grapflow(self, job_id, plan_input, occ_id):
229 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
230 run_ns_instantiate(plan_input, occ_id)
231 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
234 # def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
235 # for location in location_constraints:
236 # if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
237 # return location["locationConstraints"]["vimId"]
240 # raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
242 def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
243 for location in location_constraints:
244 if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
245 if 'vimId' in location['locationConstraints']:
246 cloud_owner = location['locationConstraints']["vimId"].split('_')[0]
247 cloud_regionid = location['locationConstraints']["vimId"].split('_')[1]
248 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
250 cloud_owner = location['locationConstraints']["cloudOwner"]
251 cloud_regionid = location['locationConstraints']["cloudRegionId"]
252 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
256 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
259 def set_vl_vim_id(vim_id, location_constraints, plan_dict):
260 if "vls" not in plan_dict:
261 logger.debug("No vl is found in nsd.")
264 for vnf in ignore_case_get(plan_dict, "vnfs"):
265 if "dependencies" in vnf:
266 for depend in vnf["dependencies"]:
267 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
269 # for location in location_constraints:
270 # if "vnfProfileId" in location:
271 # vnfd_id = location["vnfProfileId"]
272 # vnf_vim[vnfd_id] = location["locationConstraints"]["vimId"]
273 for location in location_constraints:
274 if "vnfProfileId" in location:
275 vnfd_id = location["vnfProfileId"]
276 if 'vimId' in location["locationConstraints"]:
277 cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
278 cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
279 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
280 vnf_vim[vnfd_id] = vim_id
282 cloud_owner = location["locationConstraints"]["cloudOwner"]
283 cloud_regionid = location["locationConstraints"]["cloudRegionId"]
284 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
285 vnf_vim[vnfd_id] = vim_id
286 for vl in plan_dict["vls"]:
287 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
288 vimid = ignore_case_get(vnf_vim, vnfdid)
292 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
293 if "location_info" not in vl["properties"]:
294 vl["properties"]["location_info"] = {}
295 vl["properties"]["location_info"]["vimid"] = vimid
298 def get_model_count(context):
299 data = json.JSONDecoder().decode(context)
300 vls = len(data.get('vls', []))
301 sfcs = len(data.get('fps', []))
302 vnfs = len(data.get('vnfs', []))
303 pnfs = len(data.get('pnfs', []))
304 return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
306 def init_pnf_para(self, plan_dict):
307 pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
308 pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
309 logger.debug("addpnfData ; %s" % pnfs_in_input)
310 logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
312 for pnf in pnfs_in_input:
313 for pnfd in pnfs_in_nsd:
314 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
316 pnf["nsInstances"] = self.ns_inst_id