1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
39 logger = logging.getLogger(__name__)
42 class BuildInWorkflowThread(Thread):
43 def __init__(self, plan_input, occ_id):
45 self.plan_input = plan_input
49 build_in.run_ns_instantiate(self.plan_input, self.occ_id)
52 class InstantNSService(object):
53 def __init__(self, ns_inst_id, plan_content):
54 self.ns_inst_id = ns_inst_id
55 self.req_data = plan_content
58 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
59 occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
62 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
63 logger.debug('req_data=%s' % self.req_data)
64 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
68 if 'additionalParamForNs' in self.req_data:
69 for key, val in self.req_data['additionalParamForNs'].items():
70 input_parameters.append({"key": key, "value": val})
71 if 'location' in self.req_data['additionalParamForNs']:
72 vim_id = self.req_data['additionalParamForNs']['location']
73 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
75 params_json = json.JSONEncoder().encode({})
77 location_constraints = []
78 if 'locationConstraints' in self.req_data:
79 location_constraints = self.req_data['locationConstraints']
81 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
82 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
83 logger.debug('tosca plan dest: %s' % dst_plan)
84 logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
85 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
88 plan_dict = json.JSONDecoder().decode(dst_plan)
89 for vnf in ignore_case_get(plan_dict, "vnfs"):
90 vnfd_id = vnf['properties']['id']
91 vnfm_type_temp = vnf['properties'].get("vnfm_info", "undefined")
92 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type_temp)
93 vnfm_type = vnfm_type_temp
94 if isinstance(vnfm_type_temp, list):
95 vnfm_type = vnfm_type_temp[0]
96 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
97 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
100 "vnfProfileId": vnf["vnf_id"],
102 "properties": json.JSONEncoder().encode(vnf['properties']),
104 "vnfmInstanceId": vnfm_info["vnfmId"],
105 "vnfmType": vnfm_type,
106 "inputs": params_json
110 self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
111 dst_plan = json.JSONEncoder().encode(plan_dict)
112 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
113 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
115 pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
117 vnf_params_json = json.JSONEncoder().encode(params_vnf)
120 'nsInstanceId': self.ns_inst_id,
121 'object_context': dst_plan,
122 'object_additionalParamForNs': params_json,
123 'object_additionalParamForVnf': vnf_params_json,
124 'object_additionalParamForPnf': pnf_params_json
126 plan_input.update(**self.get_model_count(dst_plan))
128 if 'additionalParamForNs' in self.req_data:
129 plan_input["sdnControllerId"] = ignore_case_get(
130 self.req_data['additionalParamForNs'], "sdncontroller")
132 ServiceBaseInfoModel(service_id=self.ns_inst_id,
133 service_name=ns_inst.name,
135 description=ns_inst.description,
137 status=ns_inst.status,
139 create_time=int(time.time() * 1000)).save()
141 if config.WORKFLOW_OPTION == "wso2":
142 service_tpl = get_servicetemplate(ns_inst.nsd_id)
143 DefPkgMappingModel(service_id=self.ns_inst_id,
144 service_def_id=service_tpl['csarId'],
145 template_name=service_tpl['templateName'],
146 template_id=service_tpl['serviceTemplateId']).save()
148 for key, val in self.req_data['additionalParamForNs'].items():
149 InputParamMappingModel(service_id=self.ns_inst_id,
151 input_value=val).save()
153 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
154 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
155 vnffginstid=str(uuid.uuid4()),
156 nsinstid=self.ns_inst_id,
157 endpointnumber=0).save()
161 logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
162 if config.WORKFLOW_OPTION == "wso2":
163 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
164 elif config.WORKFLOW_OPTION == "activiti":
165 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
166 elif config.WORKFLOW_OPTION == "grapflow":
167 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
169 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
171 except Exception as e:
172 logger.error(traceback.format_exc())
173 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
174 NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.message)
175 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
176 return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
178 def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
180 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
181 process_id = get_process_id('init', servicetemplate_id)
182 data = {"processId": process_id, "params": {"planInput": plan_input}}
183 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
185 ret = workflow_run(data)
186 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
187 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
188 self.ns_inst_id, ret.get('status')))
189 if ret.get('status') == 1:
190 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
191 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
193 def start_activiti_workflow(self, job_id, plan_input, occ_id):
195 plans = WFPlanModel.objects.filter()
197 raise NSLCMException("No plan is found, you should deploy plan first!")
199 "processId": plans[0].process_id,
202 ret = activiti.exec_workflow(data)
203 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
204 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
205 self.ns_inst_id, ret.get('status')))
206 if ret.get('status') == 1:
207 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
208 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
210 def start_buildin_workflow(self, job_id, plan_input, occ_id):
211 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
212 BuildInWorkflowThread(plan_input, occ_id).start()
213 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
215 def start_buildin_grapflow(self, job_id, plan_input, occ_id):
216 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
217 run_ns_instantiate(plan_input, occ_id)
218 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
221 def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
222 for location in location_constraints:
223 if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
224 return location["locationConstraints"]["vimId"]
227 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
230 def set_vl_vim_id(vim_id, location_constraints, plan_dict):
231 if "vls" not in plan_dict:
232 logger.debug("No vl is found in nsd.")
235 for vnf in ignore_case_get(plan_dict, "vnfs"):
236 if "dependencies" in vnf:
237 for depend in vnf["dependencies"]:
238 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
240 for location in location_constraints:
241 if "vnfProfileId" in location:
242 vnfd_id = location["vnfProfileId"]
243 vnf_vim[vnfd_id] = location["locationConstraints"]["vimId"]
244 for vl in plan_dict["vls"]:
245 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
246 vimid = ignore_case_get(vnf_vim, vnfdid)
250 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
251 if "location_info" not in vl["properties"]:
252 vl["properties"]["location_info"] = {}
253 vl["properties"]["location_info"]["vimid"] = vimid
256 def get_model_count(context):
257 data = json.JSONDecoder().decode(context)
258 vls = len(data.get('vls', []))
259 sfcs = len(data.get('fps', []))
260 vnfs = len(data.get('vnfs', []))
261 pnfs = len(data.get('pnfs', []))
262 return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
264 def init_pnf_para(self, plan_dict):
265 pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
266 pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
267 logger.debug("addpnfData ; %s" % pnfs_in_input)
268 logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
270 for pnf in pnfs_in_input:
271 for pnfd in pnfs_in_nsd:
272 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
274 pnf["nsInstances"] = self.ns_inst_id