1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
38 logger = logging.getLogger(__name__)
41 class BuildInWorkflowThread(Thread):
42 def __init__(self, plan_input):
44 self.plan_input = plan_input
47 build_in.run_ns_instantiate(self.plan_input)
50 class InstantNSService(object):
51 def __init__(self, ns_inst_id, plan_content):
52 self.ns_inst_id = ns_inst_id
53 self.req_data = plan_content
57 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
58 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
59 logger.debug('req_data=%s' % self.req_data)
60 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
64 if 'additionalParamForNs' in self.req_data:
65 for key, val in self.req_data['additionalParamForNs'].items():
66 input_parameters.append({"key": key, "value": val})
67 if 'location' in self.req_data['additionalParamForNs']:
68 vim_id = self.req_data['additionalParamForNs']['location']
69 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
71 params_json = json.JSONEncoder().encode({})
73 location_constraints = []
74 if 'locationConstraints' in self.req_data:
75 location_constraints = self.req_data['locationConstraints']
77 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
78 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
79 logger.debug('tosca plan dest: %s' % dst_plan)
80 logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
81 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
84 plan_dict = json.JSONDecoder().decode(dst_plan)
85 for vnf in ignore_case_get(plan_dict, "vnfs"):
86 vnfd_id = vnf['properties']['id']
87 vnfm_type_temp = vnf['properties'].get("vnfm_info", "undefined")
88 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type_temp)
89 vnfm_type = vnfm_type_temp
90 if isinstance(vnfm_type_temp, list):
91 vnfm_type = vnfm_type_temp[0]
92 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
93 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
96 "vnfProfileId": vnf["vnf_id"],
98 "properties": json.JSONEncoder().encode(vnf['properties']),
100 "vnfmInstanceId": vnfm_info["vnfmId"],
101 "vnfmType": vnfm_type,
102 "inputs": params_json
106 self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
107 dst_plan = json.JSONEncoder().encode(plan_dict)
108 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
109 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
111 pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
113 vnf_params_json = json.JSONEncoder().encode(params_vnf)
116 'nsInstanceId': self.ns_inst_id,
117 'object_context': dst_plan,
118 'object_additionalParamForNs': params_json,
119 'object_additionalParamForVnf': vnf_params_json,
120 'object_additionalParamForPnf': pnf_params_json
122 plan_input.update(**self.get_model_count(dst_plan))
124 if 'additionalParamForNs' in self.req_data:
125 plan_input["sdnControllerId"] = ignore_case_get(
126 self.req_data['additionalParamForNs'], "sdncontroller")
128 ServiceBaseInfoModel(service_id=self.ns_inst_id,
129 service_name=ns_inst.name,
131 description=ns_inst.description,
133 status=ns_inst.status,
135 create_time=int(time.time() * 1000)).save()
137 if config.WORKFLOW_OPTION == "wso2":
138 service_tpl = get_servicetemplate(ns_inst.nsd_id)
139 DefPkgMappingModel(service_id=self.ns_inst_id,
140 service_def_id=service_tpl['csarId'],
141 template_name=service_tpl['templateName'],
142 template_id=service_tpl['serviceTemplateId']).save()
144 for key, val in self.req_data['additionalParamForNs'].items():
145 InputParamMappingModel(service_id=self.ns_inst_id,
147 input_value=val).save()
149 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
150 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
151 vnffginstid=str(uuid.uuid4()),
152 nsinstid=self.ns_inst_id,
153 endpointnumber=0).save()
157 logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
158 if config.WORKFLOW_OPTION == "wso2":
159 return self.start_wso2_workflow(job_id, ns_inst, plan_input)
160 elif config.WORKFLOW_OPTION == "activiti":
161 return self.start_activiti_workflow()
162 elif config.WORKFLOW_OPTION == "grapflow":
163 return self.start_buildin_grapflow(job_id, plan_input)
165 return self.start_buildin_workflow(job_id, plan_input)
167 except Exception as e:
168 logger.error(traceback.format_exc())
169 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
170 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
171 return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
173 def start_wso2_workflow(self, job_id, ns_inst, plan_input):
174 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
175 process_id = get_process_id('init', servicetemplate_id)
176 data = {"processId": process_id, "params": {"planInput": plan_input}}
177 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
179 ret = workflow_run(data)
180 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
181 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
182 self.ns_inst_id, ret.get('status')))
183 if ret.get('status') == 1:
184 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
185 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
187 def start_activiti_workflow(self, job_id, plan_input):
188 plans = WFPlanModel.objects.filter()
190 raise NSLCMException("No plan is found, you should deploy plan first!")
192 "processId": plans[0].process_id,
195 ret = activiti.exec_workflow(data)
196 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
197 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
198 self.ns_inst_id, ret.get('status')))
199 if ret.get('status') == 1:
200 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
201 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
203 def start_buildin_workflow(self, job_id, plan_input):
204 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
205 BuildInWorkflowThread(plan_input).start()
206 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
208 def start_buildin_grapflow(self, job_id, plan_input):
209 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
210 run_ns_instantiate(plan_input)
211 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
214 def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
215 for location in location_constraints:
216 if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
217 return location["locationConstraints"]["vimId"]
220 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
223 def set_vl_vim_id(vim_id, location_constraints, plan_dict):
224 if "vls" not in plan_dict:
225 logger.debug("No vl is found in nsd.")
228 for vnf in ignore_case_get(plan_dict, "vnfs"):
229 if "dependencies" in vnf:
230 for depend in vnf["dependencies"]:
231 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
233 for location in location_constraints:
234 if "vnfProfileId" in location:
235 vnfd_id = location["vnfProfileId"]
236 vnf_vim[vnfd_id] = location["locationConstraints"]["vimId"]
237 for vl in plan_dict["vls"]:
238 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
239 vimid = ignore_case_get(vnf_vim, vnfdid)
243 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
244 if "location_info" not in vl["properties"]:
245 vl["properties"]["location_info"] = {}
246 vl["properties"]["location_info"]["vimid"] = vimid
249 def get_model_count(context):
250 data = json.JSONDecoder().decode(context)
251 vls = len(data.get('vls', []))
252 sfcs = len(data.get('fps', []))
253 vnfs = len(data.get('vnfs', []))
254 pnfs = len(data.get('pnfs', []))
255 return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
257 def init_pnf_para(self, plan_dict):
258 pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
259 pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
260 logger.debug("addpnfData ; %s" % pnfs_in_input)
261 logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
263 for pnf in pnfs_in_input:
264 for pnfd in pnfs_in_nsd:
265 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
267 pnf["nsInstances"] = self.ns_inst_id