1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
39 logger = logging.getLogger(__name__)
42 class BuildInWorkflowThread(Thread):
43 def __init__(self, plan_input, occ_id):
45 self.plan_input = plan_input
49 build_in.run_ns_instantiate(self.plan_input, self.occ_id)
52 class InstantNSService(object):
53 def __init__(self, ns_inst_id, plan_content):
54 self.ns_inst_id = ns_inst_id
55 self.req_data = plan_content
58 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
59 occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
62 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
63 logger.debug('req_data=%s' % self.req_data)
64 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
68 if 'additionalParamForNs' in self.req_data:
69 for key, val in list(self.req_data['additionalParamForNs'].items()):
70 input_parameters.append({"key": key, "value": val})
72 if 'location' in self.req_data['additionalParamForNs']:
73 cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
74 cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
75 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
76 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
78 params_json = json.JSONEncoder().encode({})
80 location_constraints = []
81 if 'locationConstraints' in self.req_data:
82 location_constraints = self.req_data['locationConstraints']
84 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
85 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
86 logger.debug('tosca plan dest: %s' % dst_plan)
87 logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
88 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
91 plan_dict = json.JSONDecoder().decode(dst_plan)
92 for vnf in ignore_case_get(plan_dict, "vnfs"):
93 vnfd_id = vnf['properties']['id']
94 vnfm_type_temp = vnf['properties'].get("vnfm_info", "undefined")
95 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type_temp)
96 vnfm_type = vnfm_type_temp
97 if isinstance(vnfm_type_temp, list):
98 vnfm_type = vnfm_type_temp[0]
99 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
100 s_vimid = "%s_%s" % (vimid["cloud_owner"], vimid["cloud_regionid"])
101 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=s_vimid)
104 "vnfProfileId": vnf["vnf_id"],
106 "properties": json.JSONEncoder().encode(vnf['properties']),
108 "vnfmInstanceId": vnfm_info["vnfmId"],
109 "vnfmType": vnfm_type,
110 "inputs": params_json
114 self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
115 dst_plan = json.JSONEncoder().encode(plan_dict)
116 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
117 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
119 pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
121 vnf_params_json = json.JSONEncoder().encode(params_vnf)
124 'nsInstanceId': self.ns_inst_id,
125 'object_context': dst_plan,
126 'object_additionalParamForNs': params_json,
127 'object_additionalParamForVnf': vnf_params_json,
128 'object_additionalParamForPnf': pnf_params_json
130 plan_input.update(**self.get_model_count(dst_plan))
132 if 'additionalParamForNs' in self.req_data:
133 plan_input["sdnControllerId"] = ignore_case_get(
134 self.req_data['additionalParamForNs'], "sdncontroller")
136 ServiceBaseInfoModel(service_id=self.ns_inst_id,
137 service_name=ns_inst.name,
139 description=ns_inst.description,
141 status=ns_inst.status,
143 create_time=int(time.time() * 1000)).save()
145 if config.WORKFLOW_OPTION == "wso2":
146 service_tpl = get_servicetemplate(ns_inst.nsd_id)
147 DefPkgMappingModel(service_id=self.ns_inst_id,
148 service_def_id=service_tpl['csarId'],
149 template_name=service_tpl['templateName'],
150 template_id=service_tpl['serviceTemplateId']).save()
152 for key, val in list(self.req_data['additionalParamForNs'].items()):
153 InputParamMappingModel(service_id=self.ns_inst_id,
155 input_value=val).save()
157 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
158 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
159 vnffginstid=str(uuid.uuid4()),
160 nsinstid=self.ns_inst_id,
161 endpointnumber=0).save()
165 logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
166 if config.WORKFLOW_OPTION == "wso2":
167 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
168 elif config.WORKFLOW_OPTION == "activiti":
169 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
170 elif config.WORKFLOW_OPTION == "grapflow":
171 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
173 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
175 except Exception as e:
176 logger.error(traceback.format_exc())
177 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.args[0]))
178 NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.args[0])
179 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
180 return dict(data={'error': e.args[0]}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
182 def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
184 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
185 process_id = get_process_id('init', servicetemplate_id)
186 data = {"processId": process_id, "params": {"planInput": plan_input}}
187 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
189 ret = workflow_run(data)
190 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
191 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
192 self.ns_inst_id, ret.get('status')))
193 if ret.get('status') == 1:
194 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
195 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
197 def start_activiti_workflow(self, job_id, plan_input, occ_id):
199 plans = WFPlanModel.objects.filter()
201 raise NSLCMException("No plan is found, you should deploy plan first!")
203 "processId": plans[0].process_id,
206 ret = activiti.exec_workflow(data)
207 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
208 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
209 self.ns_inst_id, ret.get('status')))
210 if ret.get('status') == 1:
211 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
212 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
214 def start_buildin_workflow(self, job_id, plan_input, occ_id):
215 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
216 BuildInWorkflowThread(plan_input, occ_id).start()
217 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
219 def start_buildin_grapflow(self, job_id, plan_input, occ_id):
220 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
221 run_ns_instantiate(plan_input, occ_id)
222 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
225 def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
226 for location in location_constraints:
227 if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
228 # if 'vimId' in location['locationConstraints']:
229 if len(location['locationConstraints']) == 1:
230 cloud_owner = location['locationConstraints']["vimId"].split('_')[0]
231 cloud_regionid = location['locationConstraints']["vimId"].split('_')[1]
232 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
233 elif len(location['locationConstraints']) == 2:
234 cloud_owner = location['locationConstraints']["cloudOwner"]
235 cloud_regionid = location['locationConstraints']["cloudRegionId"]
236 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
240 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
243 def set_vl_vim_id(vim_id, location_constraints, plan_dict):
244 if "vls" not in plan_dict:
245 logger.debug("No vl is found in nsd.")
248 for vnf in ignore_case_get(plan_dict, "vnfs"):
249 if "dependencies" in vnf:
250 for depend in vnf["dependencies"]:
251 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
254 for location in location_constraints:
255 if "vnfProfileId" in location:
256 vnfd_id = location["vnfProfileId"]
257 # if 'vimId' in location["locationConstraints"]:
258 if len(location['locationConstraints']) == 1:
259 cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
260 cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
261 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
262 vnf_vim[vnfd_id] = vim_id
263 elif len(location['locationConstraints']) == 2:
264 cloud_owner = location["locationConstraints"]["cloudOwner"]
265 cloud_regionid = location["locationConstraints"]["cloudRegionId"]
266 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
267 vnf_vim[vnfd_id] = vim_id
268 for vl in plan_dict["vls"]:
269 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
270 vimid = ignore_case_get(vnf_vim, vnfdid)
274 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
275 if "location_info" not in vl["properties"]:
276 vl["properties"]["location_info"] = {}
277 vl["properties"]["location_info"]["vimid"] = vimid
280 def get_model_count(context):
281 data = json.JSONDecoder().decode(context)
282 vls = len(data.get('vls', []))
283 sfcs = len(data.get('fps', []))
284 vnfs = len(data.get('vnfs', []))
285 pnfs = len(data.get('pnfs', []))
286 return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
288 def init_pnf_para(self, plan_dict):
289 pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
290 pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
291 logger.debug("addpnfData ; %s" % pnfs_in_input)
292 logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
294 for pnf in pnfs_in_input:
295 for pnfd in pnfs_in_nsd:
296 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
298 pnf["nsInstances"] = self.ns_inst_id