1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
38 from lcm.ns.enum import NS_INST_STATUS
40 logger = logging.getLogger(__name__)
43 class BuildInWorkflowThread(Thread):
44 def __init__(self, plan_input, occ_id):
46 self.plan_input = plan_input
50 build_in.run_ns_instantiate(self.plan_input, self.occ_id)
53 class InstantNSService(object):
58 def __init__(self, ns_inst_id, plan_content):
59 self.ns_inst_id = ns_inst_id
60 self.req_data = plan_content
63 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
64 occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
65 NSInstModel.objects.filter(id=self.ns_inst_id).update(status=NS_INST_STATUS.INSTANTIATING)
68 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
69 logger.debug('req_data=%s' % self.req_data)
70 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
74 if 'additionalParamForNs' in self.req_data:
75 for key, val in list(self.req_data['additionalParamForNs'].items()):
76 input_parameters.append({"key": key, "value": val})
78 if 'location' in self.req_data['additionalParamForNs']:
79 cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
80 cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
81 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
82 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
84 params_json = json.JSONEncoder().encode({})
86 location_constraints = [] if not self.req_data.get('locationConstraints') \
87 else self.req_data['locationConstraints']
88 vnf_vim = self.get_vnf_vim_info(location_constraints)
90 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
91 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
92 logger.debug('tosca plan dest: %s' % dst_plan)
93 logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
94 sdc_run_catalog.modify_nsd_state(ns_inst.nspackage_id)
95 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
98 plan_dict = json.JSONDecoder().decode(dst_plan)
99 for vnf in ignore_case_get(plan_dict, "vnfs"):
100 vnfd_id = vnf['properties']['id']
101 vnfm_type = vnf['properties'].get("vnfm_info", "undefined")
102 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type)
103 if isinstance(vnfm_type, list):
104 vnfm_type = vnfm_type[0]
105 vimid = self.get_vnf_vim_id(vim_id, vnf_vim, vnfd_id)
106 s_vimid = "%s_%s" % (vimid["cloud_owner"], vimid["cloud_regionid"])
107 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=s_vimid)
110 "vnfProfileId": vnf["vnf_id"],
112 "properties": json.JSONEncoder().encode(vnf['properties']),
114 "vnfmInstanceId": vnfm_info["vnfmId"],
115 "vnfmType": vnfm_type,
116 "inputs": params_json
120 self.set_vl_vim_id(vim_id, vnf_vim, plan_dict)
121 dst_plan = json.JSONEncoder().encode(plan_dict)
122 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
123 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
125 pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
127 vnf_params_json = json.JSONEncoder().encode(params_vnf)
130 'nsInstanceId': self.ns_inst_id,
131 'object_context': dst_plan,
132 'object_additionalParamForNs': params_json,
133 'object_additionalParamForVnf': vnf_params_json,
134 'object_additionalParamForPnf': pnf_params_json
136 plan_input.update(**self.get_model_count(dst_plan))
138 if 'additionalParamForNs' in self.req_data:
139 plan_input["sdnControllerId"] = ignore_case_get(
140 self.req_data['additionalParamForNs'], "sdncontroller")
142 ServiceBaseInfoModel(service_id=self.ns_inst_id,
143 service_name=ns_inst.name,
145 description=ns_inst.description,
147 status=ns_inst.status,
149 create_time=int(time.time() * 1000)).save()
151 if config.WORKFLOW_OPTION == "wso2":
152 service_tpl = get_servicetemplate(ns_inst.nsd_id)
153 DefPkgMappingModel(service_id=self.ns_inst_id,
154 service_def_id=service_tpl['csarId'],
155 template_name=service_tpl['templateName'],
156 template_id=service_tpl['serviceTemplateId']).save()
158 for key, val in list(self.req_data['additionalParamForNs'].items()):
159 InputParamMappingModel(service_id=self.ns_inst_id,
161 input_value=val).save()
163 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
164 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
165 vnffginstid=str(uuid.uuid4()),
166 nsinstid=self.ns_inst_id,
167 endpointnumber=0).save()
171 logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
172 if config.WORKFLOW_OPTION == "wso2":
173 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
174 elif config.WORKFLOW_OPTION == "activiti":
175 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
176 elif config.WORKFLOW_OPTION == "grapflow":
177 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
179 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
181 except Exception as e:
182 logger.error(traceback.format_exc())
183 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.args[0]))
184 NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.args[0])
185 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
186 build_in.post_deal(self.ns_inst_id, "false")
187 return dict(data={'error': e.args[0]}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
189 def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
199 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
200 process_id = get_process_id('init', servicetemplate_id)
201 data = {"processId": process_id, "params": {"planInput": plan_input}}
202 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
204 ret = workflow_run(data)
205 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
206 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
207 self.ns_inst_id, ret.get('status')))
208 if ret.get('status') == 1:
209 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
210 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
212 def start_activiti_workflow(self, job_id, plan_input, occ_id):
214 Start activiti workflow
221 plans = WFPlanModel.objects.filter()
223 raise NSLCMException("No plan is found, you should deploy plan first!")
225 "processId": plans[0].process_id,
228 ret = activiti.exec_workflow(data)
229 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
230 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
231 self.ns_inst_id, ret.get('status')))
232 if ret.get('status') == 1:
233 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
234 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
236 def start_buildin_workflow(self, job_id, plan_input, occ_id):
238 Start buildin workflow
244 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
245 BuildInWorkflowThread(plan_input, occ_id).start()
246 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
248 def start_buildin_grapflow(self, job_id, plan_input, occ_id):
249 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
250 run_ns_instantiate(plan_input, occ_id)
251 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
254 def get_vnf_vim_id(vim_id, vnf_vim, vnfdid):
255 new_vim_id = vnf_vim.get(vnfdid) if vnf_vim.get(vnfdid) else vim_id
258 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
261 def get_vnf_vim_info(location_constraints):
263 for location in location_constraints:
264 if "vnfProfileId" in location:
265 vnfd_id = location["vnfProfileId"]
266 if len(location['locationConstraints']) == 1:
267 cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
268 cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
269 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
270 vnf_vim[vnfd_id] = vim_id
271 elif len(location['locationConstraints']) == 2:
272 cloud_owner = location["locationConstraints"]["cloudOwner"]
273 cloud_regionid = location["locationConstraints"]["cloudRegionId"]
274 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
275 vnf_vim[vnfd_id] = vim_id
279 def set_vl_vim_id(vim_id, vnf_vim, plan_dict):
280 if "vls" not in plan_dict:
281 logger.debug("No vl is found in nsd.")
284 for vnf in ignore_case_get(plan_dict, "vnfs"):
285 if "dependencies" in vnf:
286 for depend in vnf["dependencies"]:
287 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
288 for vl in plan_dict["vls"]:
289 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
290 vimid = ignore_case_get(vnf_vim, vnfdid)
293 if "location_info" not in vl["properties"]:
294 vl["properties"]["location_info"] = {}
295 vl["properties"]["location_info"]["vimid"] = vimid
298 def get_model_count(context):
299 data = json.JSONDecoder().decode(context)
300 vls = len(data.get('vls', []))
301 sfcs = len(data.get('fps', []))
302 vnfs = len(data.get('vnfs', []))
303 pnfs = len(data.get('pnfs', []))
304 return {'vlCount': vls, 'sfcCount': sfcs, 'vnfCount': vnfs, 'pnfCount': pnfs}
306 def init_pnf_para(self, plan_dict):
307 pnfs_in_input = ignore_case_get(self.req_data, "addpnfData", [])
308 pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs", [])
309 logger.debug("addpnfData ; %s" % pnfs_in_input)
310 logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
312 for pnf in pnfs_in_input:
313 for pnfd in pnfs_in_nsd:
314 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
316 pnf["nsInstances"] = self.ns_inst_id