1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
38 from lcm.ns.enum import NS_INST_STATUS
40 logger = logging.getLogger(__name__)
43 class BuildInWorkflowThread(Thread):
44 def __init__(self, plan_input, occ_id):
46 self.plan_input = plan_input
50 build_in.run_ns_instantiate(self.plan_input, self.occ_id)
53 class InstantNSService(object):
54 def __init__(self, ns_inst_id, plan_content):
55 self.ns_inst_id = ns_inst_id
56 self.req_data = plan_content
59 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
60 occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
61 NSInstModel.objects.filter(id=self.ns_inst_id).update(status=NS_INST_STATUS.INSTANTIATING)
64 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
65 logger.debug('req_data=%s' % self.req_data)
66 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
70 if 'additionalParamForNs' in self.req_data:
71 for key, val in list(self.req_data['additionalParamForNs'].items()):
72 input_parameters.append({"key": key, "value": val})
74 if 'location' in self.req_data['additionalParamForNs']:
75 cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
76 cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
77 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
78 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
80 params_json = json.JSONEncoder().encode({})
82 location_constraints = [] if not self.req_data.get('locationConstraints')\
83 else self.req_data['locationConstraints']
84 vnf_vim = self.get_vnf_vim_info(location_constraints)
86 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
87 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
88 logger.debug('tosca plan dest: %s' % dst_plan)
89 logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
90 sdc_run_catalog.modify_nsd_state(ns_inst.nspackage_id)
91 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
94 plan_dict = json.JSONDecoder().decode(dst_plan)
95 for vnf in ignore_case_get(plan_dict, "vnfs"):
96 vnfd_id = vnf['properties']['id']
97 vnfm_type = vnf['properties'].get("vnfm_info", "undefined")
98 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type)
99 if isinstance(vnfm_type, list):
100 vnfm_type = vnfm_type[0]
101 vimid = self.get_vnf_vim_id(vim_id, vnf_vim, vnfd_id)
102 s_vimid = "%s_%s" % (vimid["cloud_owner"], vimid["cloud_regionid"])
103 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=s_vimid)
106 "vnfProfileId": vnf["vnf_id"],
108 "properties": json.JSONEncoder().encode(vnf['properties']),
110 "vnfmInstanceId": vnfm_info["vnfmId"],
111 "vnfmType": vnfm_type,
112 "inputs": params_json
116 self.set_vl_vim_id(vim_id, vnf_vim, plan_dict)
117 dst_plan = json.JSONEncoder().encode(plan_dict)
118 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
119 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
121 pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
123 vnf_params_json = json.JSONEncoder().encode(params_vnf)
126 'nsInstanceId': self.ns_inst_id,
127 'object_context': dst_plan,
128 'object_additionalParamForNs': params_json,
129 'object_additionalParamForVnf': vnf_params_json,
130 'object_additionalParamForPnf': pnf_params_json
132 plan_input.update(**self.get_model_count(dst_plan))
134 if 'additionalParamForNs' in self.req_data:
135 plan_input["sdnControllerId"] = ignore_case_get(
136 self.req_data['additionalParamForNs'], "sdncontroller")
138 ServiceBaseInfoModel(service_id=self.ns_inst_id,
139 service_name=ns_inst.name,
141 description=ns_inst.description,
143 status=ns_inst.status,
145 create_time=int(time.time() * 1000)).save()
147 if config.WORKFLOW_OPTION == "wso2":
148 service_tpl = get_servicetemplate(ns_inst.nsd_id)
149 DefPkgMappingModel(service_id=self.ns_inst_id,
150 service_def_id=service_tpl['csarId'],
151 template_name=service_tpl['templateName'],
152 template_id=service_tpl['serviceTemplateId']).save()
154 for key, val in list(self.req_data['additionalParamForNs'].items()):
155 InputParamMappingModel(service_id=self.ns_inst_id,
157 input_value=val).save()
159 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
160 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
161 vnffginstid=str(uuid.uuid4()),
162 nsinstid=self.ns_inst_id,
163 endpointnumber=0).save()
167 logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
168 if config.WORKFLOW_OPTION == "wso2":
169 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
170 elif config.WORKFLOW_OPTION == "activiti":
171 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
172 elif config.WORKFLOW_OPTION == "grapflow":
173 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
175 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
177 except Exception as e:
178 logger.error(traceback.format_exc())
179 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.args[0]))
180 NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.args[0])
181 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
182 build_in.post_deal(self.ns_inst_id, "false")
183 return dict(data={'error': e.args[0]}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
185 def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
187 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
188 process_id = get_process_id('init', servicetemplate_id)
189 data = {"processId": process_id, "params": {"planInput": plan_input}}
190 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
192 ret = workflow_run(data)
193 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
194 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
195 self.ns_inst_id, ret.get('status')))
196 if ret.get('status') == 1:
197 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
198 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
200 def start_activiti_workflow(self, job_id, plan_input, occ_id):
202 plans = WFPlanModel.objects.filter()
204 raise NSLCMException("No plan is found, you should deploy plan first!")
206 "processId": plans[0].process_id,
209 ret = activiti.exec_workflow(data)
210 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
211 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
212 self.ns_inst_id, ret.get('status')))
213 if ret.get('status') == 1:
214 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
215 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
217 def start_buildin_workflow(self, job_id, plan_input, occ_id):
218 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
219 BuildInWorkflowThread(plan_input, occ_id).start()
220 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
222 def start_buildin_grapflow(self, job_id, plan_input, occ_id):
223 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
224 run_ns_instantiate(plan_input, occ_id)
225 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
228 def get_vnf_vim_id(vim_id, vnf_vim, vnfdid):
229 new_vim_id = vnf_vim.get(vnfdid) if vnf_vim.get(vnfdid) else vim_id
232 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
235 def get_vnf_vim_info(location_constraints):
237 for location in location_constraints:
238 if "vnfProfileId" in location:
239 vnfd_id = location["vnfProfileId"]
240 if len(location['locationConstraints']) == 1:
241 cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
242 cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
243 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
244 vnf_vim[vnfd_id] = vim_id
245 elif len(location['locationConstraints']) == 2:
246 cloud_owner = location["locationConstraints"]["cloudOwner"]
247 cloud_regionid = location["locationConstraints"]["cloudRegionId"]
248 vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
249 vnf_vim[vnfd_id] = vim_id
253 def set_vl_vim_id(vim_id, vnf_vim, plan_dict):
254 if "vls" not in plan_dict:
255 logger.debug("No vl is found in nsd.")
258 for vnf in ignore_case_get(plan_dict, "vnfs"):
259 if "dependencies" in vnf:
260 for depend in vnf["dependencies"]:
261 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
262 for vl in plan_dict["vls"]:
263 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
264 vimid = ignore_case_get(vnf_vim, vnfdid)
267 if "location_info" not in vl["properties"]:
268 vl["properties"]["location_info"] = {}
269 vl["properties"]["location_info"]["vimid"] = vimid
272 def get_model_count(context):
273 data = json.JSONDecoder().decode(context)
274 vls = len(data.get('vls', []))
275 sfcs = len(data.get('fps', []))
276 vnfs = len(data.get('vnfs', []))
277 pnfs = len(data.get('pnfs', []))
278 return {'vlCount': vls, 'sfcCount': sfcs, 'vnfCount': vnfs, 'pnfCount': pnfs}
280 def init_pnf_para(self, plan_dict):
281 pnfs_in_input = ignore_case_get(self.req_data, "addpnfData", [])
282 pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs", [])
283 logger.debug("addpnfData ; %s" % pnfs_in_input)
284 logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
286 for pnf in pnfs_in_input:
287 for pnfd in pnfs_in_nsd:
288 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
290 pnf["nsInstances"] = self.ns_inst_id