1 # Copyright 2016-2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from threading import Thread
21 from rest_framework import status
23 from lcm.pub.config.config import WORKFLOW_OPTION
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id, query_rawdata_from_catalog
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi.extsys import select_vnfm
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils import toscautil
34 from lcm.pub.utils.jobutil import JobUtil
35 from lcm.pub.utils.values import ignore_case_get
36 from lcm.workflows import build_in
38 logger = logging.getLogger(__name__)
41 class BuildInWorkflowThread(Thread):
42 def __init__(self, plan_input):
44 self.plan_input = plan_input
47 build_in.run_ns_instantiate(self.plan_input)
50 class InstantNSService(object):
51 def __init__(self, ns_inst_id, plan_content):
52 self.ns_inst_id = ns_inst_id
53 self.req_data = plan_content
57 job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
58 logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
59 logger.debug('req_data=%s' % self.req_data)
60 ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
63 for key, val in self.req_data['additionalParamForNs'].items():
64 input_parameters.append({"key": key, "value": val})
67 if 'location' in self.req_data['additionalParamForNs']:
68 vim_id = self.req_data['additionalParamForNs']['location']
69 location_constraints = []
70 if 'locationConstraints' in self.req_data:
71 location_constraints = self.req_data['locationConstraints']
73 JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
75 if WORKFLOW_OPTION == "wso2":
76 src_plan = query_rawdata_from_catalog(ns_inst.nspackage_id, input_parameters)
77 dst_plan = toscautil.convert_nsd_model(src_plan["rawData"])
79 dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
80 logger.debug('tosca plan dest:%s' % dst_plan)
82 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
84 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
87 plan_dict = json.JSONDecoder().decode(dst_plan)
88 for vnf in ignore_case_get(plan_dict, "vnfs"):
89 vnfd_id = vnf['properties']['id']
90 vnfm_type = vnf['properties'].get("nf_type", "undefined")
91 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
92 vnfm_info = select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
94 "vnfProfileId": vnf["vnf_id"],
96 "properties": json.JSONEncoder().encode(vnf['properties']),
98 "vnfmInstanceId": vnfm_info["vnfmId"],
99 "vnfmType": vnfm_type,
100 "inputs": params_json
105 self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
106 dst_plan = json.JSONEncoder().encode(plan_dict)
107 logger.debug('tosca plan dest add vimid:%s' % dst_plan)
108 NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
110 vnf_params_json = json.JSONEncoder().encode(params_vnf)
113 'nsInstanceId': self.req_data["nsInstanceId"],
114 'object_context': dst_plan,
115 'object_additionalParamForNs': params_json,
116 'object_additionalParamForVnf': vnf_params_json
118 plan_input.update(**self.get_model_count(dst_plan))
119 plan_input["sdnControllerId"] = ignore_case_get(
120 self.req_data['additionalParamForNs'], "sdncontroller")
122 ServiceBaseInfoModel(service_id=self.ns_inst_id,
123 service_name=ns_inst.name,
125 description=ns_inst.description,
127 status=ns_inst.status,
129 create_time=int(time.time() * 1000)).save()
131 if WORKFLOW_OPTION == "wso2":
132 service_tpl = get_servicetemplate(ns_inst.nsd_id)
133 DefPkgMappingModel(service_id=self.ns_inst_id,
134 service_def_id=service_tpl['csarId'],
135 template_name=service_tpl['templateName'],
136 template_id=service_tpl['serviceTemplateId']).save()
138 for key, val in self.req_data['additionalParamForNs'].items():
139 InputParamMappingModel(service_id=self.ns_inst_id,
141 input_value=val).save()
143 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
144 VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
145 vnffginstid=str(uuid.uuid4()),
146 nsinstid=self.ns_inst_id,
147 endpointnumber=0).save()
152 if WORKFLOW_OPTION == "wso2":
153 return self.start_wso2_workflow(job_id, ns_inst, plan_input)
154 elif WORKFLOW_OPTION == "activiti":
155 return self.start_activiti_workflow()
157 return self.start_buildin_workflow(job_id, plan_input)
159 except Exception as e:
160 logger.error(traceback.format_exc())
161 logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
162 JobUtil.add_job_status(job_id, 255, 'NS instantiation failed: %s' % e.message)
163 return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
165 def start_wso2_workflow(self, job_id, ns_inst, plan_input):
166 servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
167 process_id = get_process_id('init', servicetemplate_id)
168 data = {"processId": process_id, "params": {"planInput": plan_input}}
169 logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
171 ret = workflow_run(data)
172 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
173 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
174 self.ns_inst_id, ret.get('status')))
175 if ret.get('status') == 1:
176 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
177 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
179 def start_activiti_workflow(self, job_id, plan_input):
180 plans = WFPlanModel.objects.filter()
182 raise NSLCMException("No plan is found, you should deploy plan first!")
184 "processId": plans[0].process_id,
187 ret = activiti.exec_workflow(data)
188 logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
189 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
190 self.ns_inst_id, ret.get('status')))
191 if ret.get('status') == 1:
192 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
193 return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
195 def start_buildin_workflow(self, job_id, plan_input):
196 JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
197 BuildInWorkflowThread(plan_input).start()
198 return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
200 def get_vnf_vim_id(self, vim_id, location_constraints, vnfdid):
201 for location in location_constraints:
202 if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
203 return location["locationConstraints"]["vimId"]
206 raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
208 def set_vl_vim_id(self, vim_id, location_constraints, plan_dict):
209 if "vls" not in plan_dict:
210 logger.debug("No vl is found in nsd.")
213 for vnf in ignore_case_get(plan_dict, "vnfs"):
214 if "dependencies" in vnf:
215 for depend in vnf["dependencies"]:
216 vl_vnf[depend["vl_id"]] = vnf['properties']['id']
218 for location in location_constraints:
219 if "vnfProfileId" in location:
220 vnfd_id = location["vnfProfileId"]
221 vnf_vim[vnfd_id] = location["locationConstraints"]["vimId"]
222 for vl in plan_dict["vls"]:
223 vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
224 vimid = ignore_case_get(vnf_vim, vnfdid)
228 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
229 if "location_info" not in vl["properties"]:
230 vl["properties"]["location_info"] = {}
231 vl["properties"]["location_info"]["vimid"] = vimid
234 def get_model_count(context):
235 data = json.JSONDecoder().decode(context)
236 vls = len(data.get('vls', []))
237 sfcs = len(data.get('fps', []))
238 vnfs = len(data.get('vnfs', []))
239 return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs)}