1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
38 logger = logging.getLogger(__name__)
41 class BuildInWorkflowThread(Thread):
42 def __init__(self, plan_input):
44 self.plan_input = plan_input
47 build_in.run_ns_instantiate(self.plan_input)
50 class InstantNSService(object):
51 def __init__(self, ns_inst_id, plan_content):
52 self.ns_inst_id = ns_inst_id
53 self.req_data = plan_content
57 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
58 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
59 logger.debug('req_data=%s' % self.req_data)
60 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
63 for key, val in self.req_data['additionalParamForNs'].items():
64 input_parameters.append({"key": key, "value": val})
67 if 'location' in self.req_data['additionalParamForNs']:
68 vim_id = self.req_data['additionalParamForNs']['location']
69 location_constraints = []
70 if 'locationConstraints' in self.req_data:
71 location_constraints = self.req_data['locationConstraints']
73 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
74 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
75 logger.debug('tosca plan dest: %s' % dst_plan)
76 logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
77 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
79 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
81 plan_dict = json.JSONDecoder().decode(dst_plan)
82 for vnf in ignore_case_get(plan_dict, "vnfs"):
83 vnfd_id = vnf['properties']['id']
84 vnfm_type = vnf['properties'].get("nf_type", "undefined")
85 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
86 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
89 "vnfProfileId": vnf["vnf_id"],
91 "properties": json.JSONEncoder().encode(vnf['properties']),
93 "vnfmInstanceId": vnfm_info["vnfmId"],
94 "vnfmType": vnfm_type,
99 self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
100 dst_plan = json.JSONEncoder().encode(plan_dict)
101 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
102 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
104 pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
106 vnf_params_json = json.JSONEncoder().encode(params_vnf)
109 'nsInstanceId': self.ns_inst_id,
110 'object_context': dst_plan,
111 'object_additionalParamForNs': params_json,
112 'object_additionalParamForVnf': vnf_params_json,
113 'object_additionalParamForPnf': pnf_params_json
115 plan_input.update(**self.get_model_count(dst_plan))
116 plan_input["sdnControllerId"] = ignore_case_get(
117 self.req_data['additionalParamForNs'], "sdncontroller")
119 ServiceBaseInfoModel(service_id=self.ns_inst_id,
120 service_name=ns_inst.name,
122 description=ns_inst.description,
124 status=ns_inst.status,
126 create_time=int(time.time() * 1000)).save()
128 if config.WORKFLOW_OPTION == "wso2":
129 service_tpl = get_servicetemplate(ns_inst.nsd_id)
130 DefPkgMappingModel(service_id=self.ns_inst_id,
131 service_def_id=service_tpl['csarId'],
132 template_name=service_tpl['templateName'],
133 template_id=service_tpl['serviceTemplateId']).save()
135 for key, val in self.req_data['additionalParamForNs'].items():
136 InputParamMappingModel(service_id=self.ns_inst_id,
138 input_value=val).save()
140 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
141 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
142 vnffginstid=str(uuid.uuid4()),
143 nsinstid=self.ns_inst_id,
144 endpointnumber=0).save()
148 logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
149 if config.WORKFLOW_OPTION == "wso2":
150 return self.start_wso2_workflow(job_id, ns_inst, plan_input)
151 elif config.WORKFLOW_OPTION == "activiti":
152 return self.start_activiti_workflow()
153 elif config.WORKFLOW_OPTION == "grapflow":
154 return self.start_buildin_grapflow(job_id, plan_input)
156 return self.start_buildin_workflow(job_id, plan_input)
158 except Exception as e:
159 logger.error(traceback.format_exc())
160 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
161 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
162 return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
164 def start_wso2_workflow(self, job_id, ns_inst, plan_input):
165 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
166 process_id = get_process_id('init', servicetemplate_id)
167 data = {"processId": process_id, "params": {"planInput": plan_input}}
168 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
170 ret = workflow_run(data)
171 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
172 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
173 self.ns_inst_id, ret.get('status')))
174 if ret.get('status') == 1:
175 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
176 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
178 def start_activiti_workflow(self, job_id, plan_input):
179 plans = WFPlanModel.objects.filter()
181 raise NSLCMException("No plan is found, you should deploy plan first!")
183 "processId": plans[0].process_id,
186 ret = activiti.exec_workflow(data)
187 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
188 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
189 self.ns_inst_id, ret.get('status')))
190 if ret.get('status') == 1:
191 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
192 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
194 def start_buildin_workflow(self, job_id, plan_input):
195 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
196 BuildInWorkflowThread(plan_input).start()
197 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
199 def start_buildin_grapflow(self, job_id, plan_input):
200 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
201 run_ns_instantiate(plan_input)
202 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
205 def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
206 for location in location_constraints:
207 if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
208 return location["locationConstraints"]["vimId"]
211 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
214 def set_vl_vim_id(vim_id, location_constraints, plan_dict):
215 if "vls" not in plan_dict:
216 logger.debug("No vl is found in nsd.")
219 for vnf in ignore_case_get(plan_dict, "vnfs"):
220 if "dependencies" in vnf:
221 for depend in vnf["dependencies"]:
222 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
224 for location in location_constraints:
225 if "vnfProfileId" in location:
226 vnfd_id = location["vnfProfileId"]
227 vnf_vim[vnfd_id] = location["locationConstraints"]["vimId"]
228 for vl in plan_dict["vls"]:
229 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
230 vimid = ignore_case_get(vnf_vim, vnfdid)
234 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
235 if "location_info" not in vl["properties"]:
236 vl["properties"]["location_info"] = {}
237 vl["properties"]["location_info"]["vimid"] = vimid
240 def get_model_count(context):
241 data = json.JSONDecoder().decode(context)
242 vls = len(data.get('vls', []))
243 sfcs = len(data.get('fps', []))
244 vnfs = len(data.get('vnfs', []))
245 pnfs = len(data.get('pnfs', []))
246 return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
248 def init_pnf_para(self, plan_dict):
249 pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
250 pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
251 logger.debug("addpnfData ; %s" % pnfs_in_input)
252 logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
254 for pnf in pnfs_in_input:
255 for pnfd in pnfs_in_nsd:
256 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
258 pnf["nsInstances"] = self.ns_inst_id
259 # todo pnf["pnfdInfoId"]