1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
39 logger = logging.getLogger(__name__)
42 class BuildInWorkflowThread(Thread):
43 def __init__(self, plan_input, occ_id):
45 self.plan_input = plan_input
49 build_in.run_ns_instantiate(self.plan_input, self.occ_id)
52 class InstantNSService(object):
53 def __init__(self, ns_inst_id, plan_content):
54 self.ns_inst_id = ns_inst_id
55 self.req_data = plan_content
58 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
59 occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
62 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
63 logger.debug('req_data=%s' % self.req_data)
64 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
68 if 'additionalParamForNs' in self.req_data:
69 for key, val in self.req_data['additionalParamForNs'].items():
70 input_parameters.append({"key": key, "value": val})
72 if 'location' in self.req_data['additionalParamForNs']:
73 cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
74 cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
75 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
76 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
78 params_json = json.JSONEncoder().encode({})
80 location_constraints = []
81 if 'locationConstraints' in self.req_data:
82 location_constraints = self.req_data['locationConstraints']
84 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
85 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
86 logger.debug('tosca plan dest: %s' % dst_plan)
87 logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
88 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
91 plan_dict = json.JSONDecoder().decode(dst_plan)
92 for vnf in ignore_case_get(plan_dict, "vnfs"):
93 vnfd_id = vnf['properties']['id']
94 vnfm_type_temp = vnf['properties'].get("vnfm_info", "undefined")
95 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type_temp)
96 vnfm_type = vnfm_type_temp
97 if isinstance(vnfm_type_temp, list):
98 vnfm_type = vnfm_type_temp[0]
99 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
101 if isinstance(vimid, dict):
102 s_vimid = "%s_%s" % (vimid["cloud_owner"], vimid["cloud_regionid"])
103 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=s_vimid)
106 "vnfProfileId": vnf["vnf_id"],
108 "properties": json.JSONEncoder().encode(vnf['properties']),
110 "vnfmInstanceId": vnfm_info["vnfmId"],
111 "vnfmType": vnfm_type,
112 "inputs": params_json
116 self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
117 dst_plan = json.JSONEncoder().encode(plan_dict)
118 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
119 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
121 pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
123 vnf_params_json = json.JSONEncoder().encode(params_vnf)
126 'nsInstanceId': self.ns_inst_id,
127 'object_context': dst_plan,
128 'object_additionalParamForNs': params_json,
129 'object_additionalParamForVnf': vnf_params_json,
130 'object_additionalParamForPnf': pnf_params_json
132 plan_input.update(**self.get_model_count(dst_plan))
134 if 'additionalParamForNs' in self.req_data:
135 plan_input["sdnControllerId"] = ignore_case_get(
136 self.req_data['additionalParamForNs'], "sdncontroller")
138 ServiceBaseInfoModel(service_id=self.ns_inst_id,
139 service_name=ns_inst.name,
141 description=ns_inst.description,
143 status=ns_inst.status,
145 create_time=int(time.time() * 1000)).save()
147 if config.WORKFLOW_OPTION == "wso2":
148 service_tpl = get_servicetemplate(ns_inst.nsd_id)
149 DefPkgMappingModel(service_id=self.ns_inst_id,
150 service_def_id=service_tpl['csarId'],
151 template_name=service_tpl['templateName'],
152 template_id=service_tpl['serviceTemplateId']).save()
154 for key, val in self.req_data['additionalParamForNs'].items():
155 InputParamMappingModel(service_id=self.ns_inst_id,
157 input_value=val).save()
159 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
160 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
161 vnffginstid=str(uuid.uuid4()),
162 nsinstid=self.ns_inst_id,
163 endpointnumber=0).save()
167 logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
168 if config.WORKFLOW_OPTION == "wso2":
169 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
170 elif config.WORKFLOW_OPTION == "activiti":
171 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
172 elif config.WORKFLOW_OPTION == "grapflow":
173 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
175 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
177 except Exception as e:
178 logger.error(traceback.format_exc())
179 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
180 NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.message)
181 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
182 return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
184 def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
186 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
187 process_id = get_process_id('init', servicetemplate_id)
188 data = {"processId": process_id, "params": {"planInput": plan_input}}
189 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
191 ret = workflow_run(data)
192 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
193 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
194 self.ns_inst_id, ret.get('status')))
195 if ret.get('status') == 1:
196 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
197 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
199 def start_activiti_workflow(self, job_id, plan_input, occ_id):
201 plans = WFPlanModel.objects.filter()
203 raise NSLCMException("No plan is found, you should deploy plan first!")
205 "processId": plans[0].process_id,
208 ret = activiti.exec_workflow(data)
209 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
210 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
211 self.ns_inst_id, ret.get('status')))
212 if ret.get('status') == 1:
213 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
214 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
216 def start_buildin_workflow(self, job_id, plan_input, occ_id):
217 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
218 BuildInWorkflowThread(plan_input, occ_id).start()
219 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
221 def start_buildin_grapflow(self, job_id, plan_input, occ_id):
222 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
223 run_ns_instantiate(plan_input, occ_id)
224 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
227 def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
228 for location in location_constraints:
229 if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
230 # if 'vimId' in location['locationConstraints']:
231 if len(location['locationConstraints']) == 1:
232 cloud_owner = location['locationConstraints']["vimId"].split('_')[0]
233 cloud_regionid = location['locationConstraints']["vimId"].split('_')[1]
234 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
235 elif len(location['locationConstraints']) == 2:
236 cloud_owner = location['locationConstraints']["cloudOwner"]
237 cloud_regionid = location['locationConstraints']["cloudRegionId"]
238 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
242 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
245 def set_vl_vim_id(vim_id, location_constraints, plan_dict):
246 if "vls" not in plan_dict:
247 logger.debug("No vl is found in nsd.")
250 for vnf in ignore_case_get(plan_dict, "vnfs"):
251 if "dependencies" in vnf:
252 for depend in vnf["dependencies"]:
253 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
256 for location in location_constraints:
257 if "vnfProfileId" in location:
258 vnfd_id = location["vnfProfileId"]
259 # if 'vimId' in location["locationConstraints"]:
260 if len(location['locationConstraints']) == 1:
261 cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
262 cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
263 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
264 vnf_vim[vnfd_id] = vim_id
265 elif len(location['locationConstraints']) == 2:
266 cloud_owner = location["locationConstraints"]["cloudOwner"]
267 cloud_regionid = location["locationConstraints"]["cloudRegionId"]
268 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
269 vnf_vim[vnfd_id] = vim_id
270 for vl in plan_dict["vls"]:
271 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
272 vimid = ignore_case_get(vnf_vim, vnfdid)
276 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
277 if "location_info" not in vl["properties"]:
278 vl["properties"]["location_info"] = {}
279 vl["properties"]["location_info"]["vimid"] = vimid
282 def get_model_count(context):
283 data = json.JSONDecoder().decode(context)
284 vls = len(data.get('vls', []))
285 sfcs = len(data.get('fps', []))
286 vnfs = len(data.get('vnfs', []))
287 pnfs = len(data.get('pnfs', []))
288 return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
290 def init_pnf_para(self, plan_dict):
291 pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
292 pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
293 logger.debug("addpnfData ; %s" % pnfs_in_input)
294 logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
296 for pnf in pnfs_in_input:
297 for pnfd in pnfs_in_nsd:
298 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
300 pnf["nsInstances"] = self.ns_inst_id