1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
38 logger = logging.getLogger(__name__)
41 class BuildInWorkflowThread(Thread):
42 def __init__(self, plan_input):
44 self.plan_input = plan_input
47 build_in.run_ns_instantiate(self.plan_input)
50 class InstantNSService(object):
51 def __init__(self, ns_inst_id, plan_content):
52 self.ns_inst_id = ns_inst_id
53 self.req_data = plan_content
57 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
58 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
59 logger.debug('req_data=%s' % self.req_data)
60 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
64 if 'additionalParamForNs' in self.req_data:
65 for key, val in self.req_data['additionalParamForNs'].items():
66 input_parameters.append({"key": key, "value": val})
67 if 'location' in self.req_data['additionalParamForNs']:
68 vim_id = self.req_data['additionalParamForNs']['location']
69 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
71 params_json = json.JSONEncoder().encode({})
73 location_constraints = []
74 if 'locationConstraints' in self.req_data:
75 location_constraints = self.req_data['locationConstraints']
77 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
78 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
79 logger.debug('tosca plan dest: %s' % dst_plan)
80 logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
81 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
84 plan_dict = json.JSONDecoder().decode(dst_plan)
85 for vnf in ignore_case_get(plan_dict, "vnfs"):
86 vnfd_id = vnf['properties']['id']
87 vnfm_type_temp = vnf['properties'].get("vnfm_info", "undefined")
88 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type_temp)
89 if vnfm_type_temp != "undefined":
90 if isinstance(vnfm_type_temp, list):
91 vnfm_type = vnfm_type_temp[0]
92 if isinstance(vnfm_type_temp, str):
93 vnfm_type = vnfm_type_temp
94 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
95 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
98 "vnfProfileId": vnf["vnf_id"],
100 "properties": json.JSONEncoder().encode(vnf['properties']),
102 "vnfmInstanceId": vnfm_info["vnfmId"],
103 "vnfmType": vnfm_type,
104 "inputs": params_json
108 self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
109 dst_plan = json.JSONEncoder().encode(plan_dict)
110 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
111 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
113 pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
115 vnf_params_json = json.JSONEncoder().encode(params_vnf)
118 'nsInstanceId': self.ns_inst_id,
119 'object_context': dst_plan,
120 'object_additionalParamForNs': params_json,
121 'object_additionalParamForVnf': vnf_params_json,
122 'object_additionalParamForPnf': pnf_params_json
124 plan_input.update(**self.get_model_count(dst_plan))
126 if 'additionalParamForNs' in self.req_data:
127 plan_input["sdnControllerId"] = ignore_case_get(
128 self.req_data['additionalParamForNs'], "sdncontroller")
130 ServiceBaseInfoModel(service_id=self.ns_inst_id,
131 service_name=ns_inst.name,
133 description=ns_inst.description,
135 status=ns_inst.status,
137 create_time=int(time.time() * 1000)).save()
139 if config.WORKFLOW_OPTION == "wso2":
140 service_tpl = get_servicetemplate(ns_inst.nsd_id)
141 DefPkgMappingModel(service_id=self.ns_inst_id,
142 service_def_id=service_tpl['csarId'],
143 template_name=service_tpl['templateName'],
144 template_id=service_tpl['serviceTemplateId']).save()
146 for key, val in self.req_data['additionalParamForNs'].items():
147 InputParamMappingModel(service_id=self.ns_inst_id,
149 input_value=val).save()
151 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
152 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
153 vnffginstid=str(uuid.uuid4()),
154 nsinstid=self.ns_inst_id,
155 endpointnumber=0).save()
159 logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
160 if config.WORKFLOW_OPTION == "wso2":
161 return self.start_wso2_workflow(job_id, ns_inst, plan_input)
162 elif config.WORKFLOW_OPTION == "activiti":
163 return self.start_activiti_workflow()
164 elif config.WORKFLOW_OPTION == "grapflow":
165 return self.start_buildin_grapflow(job_id, plan_input)
167 return self.start_buildin_workflow(job_id, plan_input)
169 except Exception as e:
170 logger.error(traceback.format_exc())
171 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
172 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
173 return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
175 def start_wso2_workflow(self, job_id, ns_inst, plan_input):
176 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
177 process_id = get_process_id('init', servicetemplate_id)
178 data = {"processId": process_id, "params": {"planInput": plan_input}}
179 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
181 ret = workflow_run(data)
182 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
183 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
184 self.ns_inst_id, ret.get('status')))
185 if ret.get('status') == 1:
186 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
187 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
189 def start_activiti_workflow(self, job_id, plan_input):
190 plans = WFPlanModel.objects.filter()
192 raise NSLCMException("No plan is found, you should deploy plan first!")
194 "processId": plans[0].process_id,
197 ret = activiti.exec_workflow(data)
198 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
199 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
200 self.ns_inst_id, ret.get('status')))
201 if ret.get('status') == 1:
202 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
203 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
205 def start_buildin_workflow(self, job_id, plan_input):
206 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
207 BuildInWorkflowThread(plan_input).start()
208 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
210 def start_buildin_grapflow(self, job_id, plan_input):
211 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
212 run_ns_instantiate(plan_input)
213 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
216 def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
217 for location in location_constraints:
218 if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
219 return location["locationConstraints"]["vimId"]
222 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
225 def set_vl_vim_id(vim_id, location_constraints, plan_dict):
226 if "vls" not in plan_dict:
227 logger.debug("No vl is found in nsd.")
230 for vnf in ignore_case_get(plan_dict, "vnfs"):
231 if "dependencies" in vnf:
232 for depend in vnf["dependencies"]:
233 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
235 for location in location_constraints:
236 if "vnfProfileId" in location:
237 vnfd_id = location["vnfProfileId"]
238 vnf_vim[vnfd_id] = location["locationConstraints"]["vimId"]
239 for vl in plan_dict["vls"]:
240 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
241 vimid = ignore_case_get(vnf_vim, vnfdid)
245 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
246 if "location_info" not in vl["properties"]:
247 vl["properties"]["location_info"] = {}
248 vl["properties"]["location_info"]["vimid"] = vimid
251 def get_model_count(context):
252 data = json.JSONDecoder().decode(context)
253 vls = len(data.get('vls', []))
254 sfcs = len(data.get('fps', []))
255 vnfs = len(data.get('vnfs', []))
256 pnfs = len(data.get('pnfs', []))
257 return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
259 def init_pnf_para(self, plan_dict):
260 pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
261 pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
262 logger.debug("addpnfData ; %s" % pnfs_in_input)
263 logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
265 for pnf in pnfs_in_input:
266 for pnfd in pnfs_in_nsd:
267 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
269 pnf["nsInstances"] = self.ns_inst_id
270 # todo pnf["pnfdInfoId"]