optimize ns instance
[vfc/nfvo/lcm.git] / lcm / ns / biz / ns_instant.py
1 # Copyright 2016-2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import time
17 import traceback
18 import uuid
19 from threading import Thread
20
21 from rest_framework import status
22
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
38
39 logger = logging.getLogger(__name__)
40
41
42 class BuildInWorkflowThread(Thread):
43     def __init__(self, plan_input, occ_id):
44         Thread.__init__(self)
45         self.plan_input = plan_input
46         self.occ_id = occ_id
47
48     def run(self):
49         build_in.run_ns_instantiate(self.plan_input, self.occ_id)
50
51
52 class InstantNSService(object):
53     def __init__(self, ns_inst_id, plan_content):
54         self.ns_inst_id = ns_inst_id
55         self.req_data = plan_content
56
57     def do_biz(self):
58         job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
59         occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
60
61         try:
62             logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
63             logger.debug('req_data=%s' % self.req_data)
64             ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
65             vim_id = {}
66
67             input_parameters = []
68             if 'additionalParamForNs' in self.req_data:
69                 for key, val in list(self.req_data['additionalParamForNs'].items()):
70                     input_parameters.append({"key": key, "value": val})
71
72                 if 'location' in self.req_data['additionalParamForNs']:
73                     cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
74                     cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
75                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
76                 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
77             else:
78                 params_json = json.JSONEncoder().encode({})
79
80             location_constraints = [] if not self.req_data.get('locationConstraints')\
81                 else self.req_data['locationConstraints']
82             vnf_vim = self.get_vnf_vim_info(location_constraints)
83
84             JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
85             dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
86             logger.debug('tosca plan dest: %s' % dst_plan)
87             logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
88             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
89
90             params_vnf = []
91             plan_dict = json.JSONDecoder().decode(dst_plan)
92             for vnf in ignore_case_get(plan_dict, "vnfs"):
93                 vnfd_id = vnf['properties']['id']
94                 vnfm_type = vnf['properties'].get("vnfm_info", "undefined")
95                 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type)
96                 if isinstance(vnfm_type, list):
97                     vnfm_type = vnfm_type[0]
98                 vimid = self.get_vnf_vim_id(vim_id, vnf_vim, vnfd_id)
99                 s_vimid = "%s_%s" % (vimid["cloud_owner"], vimid["cloud_regionid"])
100                 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=s_vimid)
101
102                 params_vnf.append({
103                     "vnfProfileId": vnf["vnf_id"],
104                     "additionalParam": {
105                         "properties": json.JSONEncoder().encode(vnf['properties']),
106                         "vimId": vimid,
107                         "vnfmInstanceId": vnfm_info["vnfmId"],
108                         "vnfmType": vnfm_type,
109                         "inputs": params_json
110                     }
111                 })
112
113             self.set_vl_vim_id(vim_id, vnf_vim, plan_dict)
114             dst_plan = json.JSONEncoder().encode(plan_dict)
115             logger.debug('tosca plan dest add vimid:%s' % dst_plan)
116             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
117
118             pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
119
120             vnf_params_json = json.JSONEncoder().encode(params_vnf)
121             plan_input = {
122                 'jobId': job_id,
123                 'nsInstanceId': self.ns_inst_id,
124                 'object_context': dst_plan,
125                 'object_additionalParamForNs': params_json,
126                 'object_additionalParamForVnf': vnf_params_json,
127                 'object_additionalParamForPnf': pnf_params_json
128             }
129             plan_input.update(**self.get_model_count(dst_plan))
130
131             if 'additionalParamForNs' in self.req_data:
132                 plan_input["sdnControllerId"] = ignore_case_get(
133                     self.req_data['additionalParamForNs'], "sdncontroller")
134
135             ServiceBaseInfoModel(service_id=self.ns_inst_id,
136                                  service_name=ns_inst.name,
137                                  service_type='NFVO',
138                                  description=ns_inst.description,
139                                  active_status='--',
140                                  status=ns_inst.status,
141                                  creator='--',
142                                  create_time=int(time.time() * 1000)).save()
143
144             if config.WORKFLOW_OPTION == "wso2":
145                 service_tpl = get_servicetemplate(ns_inst.nsd_id)
146                 DefPkgMappingModel(service_id=self.ns_inst_id,
147                                    service_def_id=service_tpl['csarId'],
148                                    template_name=service_tpl['templateName'],
149                                    template_id=service_tpl['serviceTemplateId']).save()
150
151                 for key, val in list(self.req_data['additionalParamForNs'].items()):
152                     InputParamMappingModel(service_id=self.ns_inst_id,
153                                            input_key=key,
154                                            input_value=val).save()
155
156                 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
157                     VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
158                                    vnffginstid=str(uuid.uuid4()),
159                                    nsinstid=self.ns_inst_id,
160                                    endpointnumber=0).save()
161             else:
162                 # TODO
163                 pass
164             logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
165             if config.WORKFLOW_OPTION == "wso2":
166                 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
167             elif config.WORKFLOW_OPTION == "activiti":
168                 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
169             elif config.WORKFLOW_OPTION == "grapflow":
170                 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
171             else:
172                 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
173
174         except Exception as e:
175             logger.error(traceback.format_exc())
176             logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.args[0]))
177             NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.args[0])
178             JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
179             return dict(data={'error': e.args[0]}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
180
181     def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
182         # todo occ_id
183         servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
184         process_id = get_process_id('init', servicetemplate_id)
185         data = {"processId": process_id, "params": {"planInput": plan_input}}
186         logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
187
188         ret = workflow_run(data)
189         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
190         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
191             self.ns_inst_id, ret.get('status')))
192         if ret.get('status') == 1:
193             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
194         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
195
196     def start_activiti_workflow(self, job_id, plan_input, occ_id):
197         # todo occ_id
198         plans = WFPlanModel.objects.filter()
199         if not plans:
200             raise NSLCMException("No plan is found, you should deploy plan first!")
201         data = {
202             "processId": plans[0].process_id,
203             "params": plan_input
204         }
205         ret = activiti.exec_workflow(data)
206         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
207         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
208             self.ns_inst_id, ret.get('status')))
209         if ret.get('status') == 1:
210             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
211         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
212
213     def start_buildin_workflow(self, job_id, plan_input, occ_id):
214         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
215         BuildInWorkflowThread(plan_input, occ_id).start()
216         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
217
218     def start_buildin_grapflow(self, job_id, plan_input, occ_id):
219         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
220         run_ns_instantiate(plan_input, occ_id)
221         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
222
223     @staticmethod
224     def get_vnf_vim_id(vim_id, vnf_vim, vnfdid):
225         new_vim_id = vnf_vim.get(vnfdid) if vnf_vim.get(vnfdid) else vim_id
226         if new_vim_id:
227             return new_vim_id
228         raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
229
230     @staticmethod
231     def get_vnf_vim_info(location_constraints):
232         vnf_vim = {}
233         for location in location_constraints:
234             if "vnfProfileId" in location:
235                 vnfd_id = location["vnfProfileId"]
236                 if len(location['locationConstraints']) == 1:
237                     cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
238                     cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
239                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
240                     vnf_vim[vnfd_id] = vim_id
241                 elif len(location['locationConstraints']) == 2:
242                     cloud_owner = location["locationConstraints"]["cloudOwner"]
243                     cloud_regionid = location["locationConstraints"]["cloudRegionId"]
244                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
245                     vnf_vim[vnfd_id] = vim_id
246         return vnf_vim
247
248     @staticmethod
249     def set_vl_vim_id(vim_id, vnf_vim, plan_dict):
250         if "vls" not in plan_dict:
251             logger.debug("No vl is found in nsd.")
252             return
253         vl_vnf = {}
254         for vnf in ignore_case_get(plan_dict, "vnfs"):
255             if "dependencies" in vnf:
256                 for depend in vnf["dependencies"]:
257                     vl_vnf[depend["vl_id"]] = vnf['properties']['id']
258         for vl in plan_dict["vls"]:
259             vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
260             vimid = ignore_case_get(vnf_vim, vnfdid)
261             if not vimid:
262                 vimid = vim_id
263             if "location_info" not in vl["properties"]:
264                 vl["properties"]["location_info"] = {}
265             vl["properties"]["location_info"]["vimid"] = vimid
266
267     @staticmethod
268     def get_model_count(context):
269         data = json.JSONDecoder().decode(context)
270         vls = len(data.get('vls', []))
271         sfcs = len(data.get('fps', []))
272         vnfs = len(data.get('vnfs', []))
273         pnfs = len(data.get('pnfs', []))
274         return {'vlCount': vls, 'sfcCount': sfcs, 'vnfCount': vnfs, 'pnfCount': pnfs}
275
276     def init_pnf_para(self, plan_dict):
277         pnfs_in_input = ignore_case_get(self.req_data, "addpnfData", [])
278         pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs", [])
279         logger.debug("addpnfData ; %s" % pnfs_in_input)
280         logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
281         pnfs = {}
282         for pnf in pnfs_in_input:
283             for pnfd in pnfs_in_nsd:
284                 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
285                     k = pnfd["pnf_id"]
286                     pnf["nsInstances"] = self.ns_inst_id
287                     pnfs[k] = {
288                         "type": "CreatePnf",
289                         "input": {
290                             "content": pnf
291                         }
292                     }
293         return pnfs