1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
39 logger = logging.getLogger(__name__)
42 class BuildInWorkflowThread(Thread):
43 def __init__(self, plan_input, occ_id):
45 self.plan_input = plan_input
49 build_in.run_ns_instantiate(self.plan_input, self.occ_id)
52 class InstantNSService(object):
53 def __init__(self, ns_inst_id, plan_content):
54 self.ns_inst_id = ns_inst_id
55 self.req_data = plan_content
58 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
59 occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
62 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
63 logger.debug('req_data=%s' % self.req_data)
64 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
68 if 'additionalParamForNs' in self.req_data:
69 for key, val in list(self.req_data['additionalParamForNs'].items()):
70 input_parameters.append({"key": key, "value": val})
72 if 'location' in self.req_data['additionalParamForNs']:
73 cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
74 cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
75 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
76 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
78 params_json = json.JSONEncoder().encode({})
80 location_constraints = [] if not self.req_data.get('locationConstraints')\
81 else self.req_data['locationConstraints']
82 vnf_vim = self.get_vnf_vim_info(location_constraints)
84 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
85 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
86 logger.debug('tosca plan dest: %s' % dst_plan)
87 logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
88 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
91 plan_dict = json.JSONDecoder().decode(dst_plan)
92 for vnf in ignore_case_get(plan_dict, "vnfs"):
93 vnfd_id = vnf['properties']['id']
94 vnfm_type = vnf['properties'].get("vnfm_info", "undefined")
95 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type)
96 if isinstance(vnfm_type, list):
97 vnfm_type = vnfm_type[0]
98 vimid = self.get_vnf_vim_id(vim_id, vnf_vim, vnfd_id)
99 s_vimid = "%s_%s" % (vimid["cloud_owner"], vimid["cloud_regionid"])
100 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=s_vimid)
103 "vnfProfileId": vnf["vnf_id"],
105 "properties": json.JSONEncoder().encode(vnf['properties']),
107 "vnfmInstanceId": vnfm_info["vnfmId"],
108 "vnfmType": vnfm_type,
109 "inputs": params_json
113 self.set_vl_vim_id(vim_id, vnf_vim, plan_dict)
114 dst_plan = json.JSONEncoder().encode(plan_dict)
115 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
116 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
118 pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
120 vnf_params_json = json.JSONEncoder().encode(params_vnf)
123 'nsInstanceId': self.ns_inst_id,
124 'object_context': dst_plan,
125 'object_additionalParamForNs': params_json,
126 'object_additionalParamForVnf': vnf_params_json,
127 'object_additionalParamForPnf': pnf_params_json
129 plan_input.update(**self.get_model_count(dst_plan))
131 if 'additionalParamForNs' in self.req_data:
132 plan_input["sdnControllerId"] = ignore_case_get(
133 self.req_data['additionalParamForNs'], "sdncontroller")
135 ServiceBaseInfoModel(service_id=self.ns_inst_id,
136 service_name=ns_inst.name,
138 description=ns_inst.description,
140 status=ns_inst.status,
142 create_time=int(time.time() * 1000)).save()
144 if config.WORKFLOW_OPTION == "wso2":
145 service_tpl = get_servicetemplate(ns_inst.nsd_id)
146 DefPkgMappingModel(service_id=self.ns_inst_id,
147 service_def_id=service_tpl['csarId'],
148 template_name=service_tpl['templateName'],
149 template_id=service_tpl['serviceTemplateId']).save()
151 for key, val in list(self.req_data['additionalParamForNs'].items()):
152 InputParamMappingModel(service_id=self.ns_inst_id,
154 input_value=val).save()
156 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
157 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
158 vnffginstid=str(uuid.uuid4()),
159 nsinstid=self.ns_inst_id,
160 endpointnumber=0).save()
164 logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
165 if config.WORKFLOW_OPTION == "wso2":
166 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
167 elif config.WORKFLOW_OPTION == "activiti":
168 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
169 elif config.WORKFLOW_OPTION == "grapflow":
170 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
172 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
174 except Exception as e:
175 logger.error(traceback.format_exc())
176 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.args[0]))
177 NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.args[0])
178 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
179 return dict(data={'error': e.args[0]}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
181 def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
183 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
184 process_id = get_process_id('init', servicetemplate_id)
185 data = {"processId": process_id, "params": {"planInput": plan_input}}
186 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
188 ret = workflow_run(data)
189 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
190 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
191 self.ns_inst_id, ret.get('status')))
192 if ret.get('status') == 1:
193 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
194 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
196 def start_activiti_workflow(self, job_id, plan_input, occ_id):
198 plans = WFPlanModel.objects.filter()
200 raise NSLCMException("No plan is found, you should deploy plan first!")
202 "processId": plans[0].process_id,
205 ret = activiti.exec_workflow(data)
206 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
207 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
208 self.ns_inst_id, ret.get('status')))
209 if ret.get('status') == 1:
210 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
211 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
213 def start_buildin_workflow(self, job_id, plan_input, occ_id):
214 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
215 BuildInWorkflowThread(plan_input, occ_id).start()
216 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
218 def start_buildin_grapflow(self, job_id, plan_input, occ_id):
219 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
220 run_ns_instantiate(plan_input, occ_id)
221 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
224 def get_vnf_vim_id(vim_id, vnf_vim, vnfdid):
225 new_vim_id = vnf_vim.get(vnfdid) if vnf_vim.get(vnfdid) else vim_id
228 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
231 def get_vnf_vim_info(location_constraints):
233 for location in location_constraints:
234 if "vnfProfileId" in location:
235 vnfd_id = location["vnfProfileId"]
236 if len(location['locationConstraints']) == 1:
237 cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
238 cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
239 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
240 vnf_vim[vnfd_id] = vim_id
241 elif len(location['locationConstraints']) == 2:
242 cloud_owner = location["locationConstraints"]["cloudOwner"]
243 cloud_regionid = location["locationConstraints"]["cloudRegionId"]
244 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
245 vnf_vim[vnfd_id] = vim_id
249 def set_vl_vim_id(vim_id, vnf_vim, plan_dict):
250 if "vls" not in plan_dict:
251 logger.debug("No vl is found in nsd.")
254 for vnf in ignore_case_get(plan_dict, "vnfs"):
255 if "dependencies" in vnf:
256 for depend in vnf["dependencies"]:
257 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
258 for vl in plan_dict["vls"]:
259 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
260 vimid = ignore_case_get(vnf_vim, vnfdid)
263 if "location_info" not in vl["properties"]:
264 vl["properties"]["location_info"] = {}
265 vl["properties"]["location_info"]["vimid"] = vimid
268 def get_model_count(context):
269 data = json.JSONDecoder().decode(context)
270 vls = len(data.get('vls', []))
271 sfcs = len(data.get('fps', []))
272 vnfs = len(data.get('vnfs', []))
273 pnfs = len(data.get('pnfs', []))
274 return {'vlCount': vls, 'sfcCount': sfcs, 'vnfCount': vnfs, 'pnfCount': pnfs}
276 def init_pnf_para(self, plan_dict):
277 pnfs_in_input = ignore_case_get(self.req_data, "addpnfData", [])
278 pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs", [])
279 logger.debug("addpnfData ; %s" % pnfs_in_input)
280 logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
282 for pnf in pnfs_in_input:
283 for pnfd in pnfs_in_nsd:
284 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
286 pnf["nsInstances"] = self.ns_inst_id