1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
38 from lcm.ns.enum import NS_INST_STATUS
40 logger = logging.getLogger(__name__)
43 class BuildInWorkflowThread(Thread):
44 def __init__(self, plan_input, occ_id):
46 self.plan_input = plan_input
50 build_in.run_ns_instantiate(self.plan_input, self.occ_id)
53 class InstantNSService(object):
54 def __init__(self, ns_inst_id, plan_content):
55 self.ns_inst_id = ns_inst_id
56 self.req_data = plan_content
59 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
60 occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
61 NSInstModel.objects.filter(id=self.ns_inst_id).update(status=NS_INST_STATUS.INSTANTIATING)
64 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
65 logger.debug('req_data=%s' % self.req_data)
66 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
70 if 'additionalParamForNs' in self.req_data:
71 for key, val in list(self.req_data['additionalParamForNs'].items()):
72 input_parameters.append({"key": key, "value": val})
74 if 'location' in self.req_data['additionalParamForNs']:
75 cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
76 cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
77 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
78 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
80 params_json = json.JSONEncoder().encode({})
82 location_constraints = [] if not self.req_data.get('locationConstraints')\
83 else self.req_data['locationConstraints']
84 vnf_vim = self.get_vnf_vim_info(location_constraints)
86 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
87 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
88 logger.debug('tosca plan dest: %s' % dst_plan)
89 logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
90 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
93 plan_dict = json.JSONDecoder().decode(dst_plan)
94 for vnf in ignore_case_get(plan_dict, "vnfs"):
95 vnfd_id = vnf['properties']['id']
96 vnfm_type = vnf['properties'].get("vnfm_info", "undefined")
97 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type)
98 if isinstance(vnfm_type, list):
99 vnfm_type = vnfm_type[0]
100 vimid = self.get_vnf_vim_id(vim_id, vnf_vim, vnfd_id)
101 s_vimid = "%s_%s" % (vimid["cloud_owner"], vimid["cloud_regionid"])
102 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=s_vimid)
105 "vnfProfileId": vnf["vnf_id"],
107 "properties": json.JSONEncoder().encode(vnf['properties']),
109 "vnfmInstanceId": vnfm_info["vnfmId"],
110 "vnfmType": vnfm_type,
111 "inputs": params_json
115 self.set_vl_vim_id(vim_id, vnf_vim, plan_dict)
116 dst_plan = json.JSONEncoder().encode(plan_dict)
117 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
118 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
120 pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
122 vnf_params_json = json.JSONEncoder().encode(params_vnf)
125 'nsInstanceId': self.ns_inst_id,
126 'object_context': dst_plan,
127 'object_additionalParamForNs': params_json,
128 'object_additionalParamForVnf': vnf_params_json,
129 'object_additionalParamForPnf': pnf_params_json
131 plan_input.update(**self.get_model_count(dst_plan))
133 if 'additionalParamForNs' in self.req_data:
134 plan_input["sdnControllerId"] = ignore_case_get(
135 self.req_data['additionalParamForNs'], "sdncontroller")
137 ServiceBaseInfoModel(service_id=self.ns_inst_id,
138 service_name=ns_inst.name,
140 description=ns_inst.description,
142 status=ns_inst.status,
144 create_time=int(time.time() * 1000)).save()
146 if config.WORKFLOW_OPTION == "wso2":
147 service_tpl = get_servicetemplate(ns_inst.nsd_id)
148 DefPkgMappingModel(service_id=self.ns_inst_id,
149 service_def_id=service_tpl['csarId'],
150 template_name=service_tpl['templateName'],
151 template_id=service_tpl['serviceTemplateId']).save()
153 for key, val in list(self.req_data['additionalParamForNs'].items()):
154 InputParamMappingModel(service_id=self.ns_inst_id,
156 input_value=val).save()
158 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
159 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
160 vnffginstid=str(uuid.uuid4()),
161 nsinstid=self.ns_inst_id,
162 endpointnumber=0).save()
166 logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
167 if config.WORKFLOW_OPTION == "wso2":
168 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
169 elif config.WORKFLOW_OPTION == "activiti":
170 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
171 elif config.WORKFLOW_OPTION == "grapflow":
172 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
174 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
176 except Exception as e:
177 logger.error(traceback.format_exc())
178 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.args[0]))
179 NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.args[0])
180 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
181 build_in.post_deal(self.ns_inst_id, "false")
182 return dict(data={'error': e.args[0]}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
184 def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
186 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
187 process_id = get_process_id('init', servicetemplate_id)
188 data = {"processId": process_id, "params": {"planInput": plan_input}}
189 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
191 ret = workflow_run(data)
192 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
193 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
194 self.ns_inst_id, ret.get('status')))
195 if ret.get('status') == 1:
196 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
197 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
199 def start_activiti_workflow(self, job_id, plan_input, occ_id):
201 plans = WFPlanModel.objects.filter()
203 raise NSLCMException("No plan is found, you should deploy plan first!")
205 "processId": plans[0].process_id,
208 ret = activiti.exec_workflow(data)
209 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
210 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
211 self.ns_inst_id, ret.get('status')))
212 if ret.get('status') == 1:
213 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
214 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
216 def start_buildin_workflow(self, job_id, plan_input, occ_id):
217 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
218 BuildInWorkflowThread(plan_input, occ_id).start()
219 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
221 def start_buildin_grapflow(self, job_id, plan_input, occ_id):
222 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
223 run_ns_instantiate(plan_input, occ_id)
224 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
227 def get_vnf_vim_id(vim_id, vnf_vim, vnfdid):
228 new_vim_id = vnf_vim.get(vnfdid) if vnf_vim.get(vnfdid) else vim_id
231 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
234 def get_vnf_vim_info(location_constraints):
236 for location in location_constraints:
237 if "vnfProfileId" in location:
238 vnfd_id = location["vnfProfileId"]
239 if len(location['locationConstraints']) == 1:
240 cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
241 cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
242 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
243 vnf_vim[vnfd_id] = vim_id
244 elif len(location['locationConstraints']) == 2:
245 cloud_owner = location["locationConstraints"]["cloudOwner"]
246 cloud_regionid = location["locationConstraints"]["cloudRegionId"]
247 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
248 vnf_vim[vnfd_id] = vim_id
252 def set_vl_vim_id(vim_id, vnf_vim, plan_dict):
253 if "vls" not in plan_dict:
254 logger.debug("No vl is found in nsd.")
257 for vnf in ignore_case_get(plan_dict, "vnfs"):
258 if "dependencies" in vnf:
259 for depend in vnf["dependencies"]:
260 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
261 for vl in plan_dict["vls"]:
262 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
263 vimid = ignore_case_get(vnf_vim, vnfdid)
266 if "location_info" not in vl["properties"]:
267 vl["properties"]["location_info"] = {}
268 vl["properties"]["location_info"]["vimid"] = vimid
271 def get_model_count(context):
272 data = json.JSONDecoder().decode(context)
273 vls = len(data.get('vls', []))
274 sfcs = len(data.get('fps', []))
275 vnfs = len(data.get('vnfs', []))
276 pnfs = len(data.get('pnfs', []))
277 return {'vlCount': vls, 'sfcCount': sfcs, 'vnfCount': vnfs, 'pnfCount': pnfs}
279 def init_pnf_para(self, plan_dict):
280 pnfs_in_input = ignore_case_get(self.req_data, "addpnfData", [])
281 pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs", [])
282 logger.debug("addpnfData ; %s" % pnfs_in_input)
283 logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
285 for pnf in pnfs_in_input:
286 for pnfd in pnfs_in_nsd:
287 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
289 pnf["nsInstances"] = self.ns_inst_id