5cd337834009b2b8bb9a32b6efde147c8fe7e1d8
[vfc/nfvo/lcm.git] / lcm / ns / biz / ns_instant.py
1 # Copyright 2016-2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import time
17 import traceback
18 import uuid
19 from threading import Thread
20
21 from rest_framework import status
22
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
38 from lcm.ns.enum import NS_INST_STATUS
39
40 logger = logging.getLogger(__name__)
41
42
43 class BuildInWorkflowThread(Thread):
44     def __init__(self, plan_input, occ_id):
45         Thread.__init__(self)
46         self.plan_input = plan_input
47         self.occ_id = occ_id
48
49     def run(self):
50         build_in.run_ns_instantiate(self.plan_input, self.occ_id)
51
52
53 class InstantNSService(object):
54     def __init__(self, ns_inst_id, plan_content):
55         self.ns_inst_id = ns_inst_id
56         self.req_data = plan_content
57
58     def do_biz(self):
59         job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
60         occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
61         NSInstModel.objects.filter(id=self.ns_inst_id).update(status=NS_INST_STATUS.INSTANTIATING)
62
63         try:
64             logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
65             logger.debug('req_data=%s' % self.req_data)
66             ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
67             vim_id = {}
68
69             input_parameters = []
70             if 'additionalParamForNs' in self.req_data:
71                 for key, val in list(self.req_data['additionalParamForNs'].items()):
72                     input_parameters.append({"key": key, "value": val})
73
74                 if 'location' in self.req_data['additionalParamForNs']:
75                     cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
76                     cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
77                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
78                 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
79             else:
80                 params_json = json.JSONEncoder().encode({})
81
82             location_constraints = [] if not self.req_data.get('locationConstraints')\
83                 else self.req_data['locationConstraints']
84             vnf_vim = self.get_vnf_vim_info(location_constraints)
85
86             JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
87             dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
88             logger.debug('tosca plan dest: %s' % dst_plan)
89             logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
90             sdc_run_catalog.modify_nsd_state(ns_inst.nspackage_id)
91             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
92
93             params_vnf = []
94             plan_dict = json.JSONDecoder().decode(dst_plan)
95             for vnf in ignore_case_get(plan_dict, "vnfs"):
96                 vnfd_id = vnf['properties']['id']
97                 vnfm_type = vnf['properties'].get("vnfm_info", "undefined")
98                 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type)
99                 if isinstance(vnfm_type, list):
100                     vnfm_type = vnfm_type[0]
101                 vimid = self.get_vnf_vim_id(vim_id, vnf_vim, vnfd_id)
102                 s_vimid = "%s_%s" % (vimid["cloud_owner"], vimid["cloud_regionid"])
103                 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=s_vimid)
104
105                 params_vnf.append({
106                     "vnfProfileId": vnf["vnf_id"],
107                     "additionalParam": {
108                         "properties": json.JSONEncoder().encode(vnf['properties']),
109                         "vimId": vimid,
110                         "vnfmInstanceId": vnfm_info["vnfmId"],
111                         "vnfmType": vnfm_type,
112                         "inputs": params_json
113                     }
114                 })
115
116             self.set_vl_vim_id(vim_id, vnf_vim, plan_dict)
117             dst_plan = json.JSONEncoder().encode(plan_dict)
118             logger.debug('tosca plan dest add vimid:%s' % dst_plan)
119             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
120
121             pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
122
123             vnf_params_json = json.JSONEncoder().encode(params_vnf)
124             plan_input = {
125                 'jobId': job_id,
126                 'nsInstanceId': self.ns_inst_id,
127                 'object_context': dst_plan,
128                 'object_additionalParamForNs': params_json,
129                 'object_additionalParamForVnf': vnf_params_json,
130                 'object_additionalParamForPnf': pnf_params_json
131             }
132             plan_input.update(**self.get_model_count(dst_plan))
133
134             if 'additionalParamForNs' in self.req_data:
135                 plan_input["sdnControllerId"] = ignore_case_get(
136                     self.req_data['additionalParamForNs'], "sdncontroller")
137
138             ServiceBaseInfoModel(service_id=self.ns_inst_id,
139                                  service_name=ns_inst.name,
140                                  service_type='NFVO',
141                                  description=ns_inst.description,
142                                  active_status='--',
143                                  status=ns_inst.status,
144                                  creator='--',
145                                  create_time=int(time.time() * 1000)).save()
146
147             if config.WORKFLOW_OPTION == "wso2":
148                 service_tpl = get_servicetemplate(ns_inst.nsd_id)
149                 DefPkgMappingModel(service_id=self.ns_inst_id,
150                                    service_def_id=service_tpl['csarId'],
151                                    template_name=service_tpl['templateName'],
152                                    template_id=service_tpl['serviceTemplateId']).save()
153
154                 for key, val in list(self.req_data['additionalParamForNs'].items()):
155                     InputParamMappingModel(service_id=self.ns_inst_id,
156                                            input_key=key,
157                                            input_value=val).save()
158
159                 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
160                     VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
161                                    vnffginstid=str(uuid.uuid4()),
162                                    nsinstid=self.ns_inst_id,
163                                    endpointnumber=0).save()
164             else:
165                 # TODO
166                 pass
167             logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
168             if config.WORKFLOW_OPTION == "wso2":
169                 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
170             elif config.WORKFLOW_OPTION == "activiti":
171                 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
172             elif config.WORKFLOW_OPTION == "grapflow":
173                 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
174             else:
175                 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
176
177         except Exception as e:
178             logger.error(traceback.format_exc())
179             logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.args[0]))
180             NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.args[0])
181             JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
182             build_in.post_deal(self.ns_inst_id, "false")
183             return dict(data={'error': e.args[0]}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
184
185     def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
186         # todo occ_id
187         servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
188         process_id = get_process_id('init', servicetemplate_id)
189         data = {"processId": process_id, "params": {"planInput": plan_input}}
190         logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
191
192         ret = workflow_run(data)
193         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
194         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
195             self.ns_inst_id, ret.get('status')))
196         if ret.get('status') == 1:
197             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
198         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
199
200     def start_activiti_workflow(self, job_id, plan_input, occ_id):
201         # todo occ_id
202         plans = WFPlanModel.objects.filter()
203         if not plans:
204             raise NSLCMException("No plan is found, you should deploy plan first!")
205         data = {
206             "processId": plans[0].process_id,
207             "params": plan_input
208         }
209         ret = activiti.exec_workflow(data)
210         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
211         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
212             self.ns_inst_id, ret.get('status')))
213         if ret.get('status') == 1:
214             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
215         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
216
217     def start_buildin_workflow(self, job_id, plan_input, occ_id):
218         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
219         BuildInWorkflowThread(plan_input, occ_id).start()
220         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
221
222     def start_buildin_grapflow(self, job_id, plan_input, occ_id):
223         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
224         run_ns_instantiate(plan_input, occ_id)
225         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
226
227     @staticmethod
228     def get_vnf_vim_id(vim_id, vnf_vim, vnfdid):
229         new_vim_id = vnf_vim.get(vnfdid) if vnf_vim.get(vnfdid) else vim_id
230         if new_vim_id:
231             return new_vim_id
232         raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
233
234     @staticmethod
235     def get_vnf_vim_info(location_constraints):
236         vnf_vim = {}
237         for location in location_constraints:
238             if "vnfProfileId" in location:
239                 vnfd_id = location["vnfProfileId"]
240                 if len(location['locationConstraints']) == 1:
241                     cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
242                     cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
243                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
244                     vnf_vim[vnfd_id] = vim_id
245                 elif len(location['locationConstraints']) == 2:
246                     cloud_owner = location["locationConstraints"]["cloudOwner"]
247                     cloud_regionid = location["locationConstraints"]["cloudRegionId"]
248                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
249                     vnf_vim[vnfd_id] = vim_id
250         return vnf_vim
251
252     @staticmethod
253     def set_vl_vim_id(vim_id, vnf_vim, plan_dict):
254         if "vls" not in plan_dict:
255             logger.debug("No vl is found in nsd.")
256             return
257         vl_vnf = {}
258         for vnf in ignore_case_get(plan_dict, "vnfs"):
259             if "dependencies" in vnf:
260                 for depend in vnf["dependencies"]:
261                     vl_vnf[depend["vl_id"]] = vnf['properties']['id']
262         for vl in plan_dict["vls"]:
263             vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
264             vimid = ignore_case_get(vnf_vim, vnfdid)
265             if not vimid:
266                 vimid = vim_id
267             if "location_info" not in vl["properties"]:
268                 vl["properties"]["location_info"] = {}
269             vl["properties"]["location_info"]["vimid"] = vimid
270
271     @staticmethod
272     def get_model_count(context):
273         data = json.JSONDecoder().decode(context)
274         vls = len(data.get('vls', []))
275         sfcs = len(data.get('fps', []))
276         vnfs = len(data.get('vnfs', []))
277         pnfs = len(data.get('pnfs', []))
278         return {'vlCount': vls, 'sfcCount': sfcs, 'vnfCount': vnfs, 'pnfCount': pnfs}
279
280     def init_pnf_para(self, plan_dict):
281         pnfs_in_input = ignore_case_get(self.req_data, "addpnfData", [])
282         pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs", [])
283         logger.debug("addpnfData ; %s" % pnfs_in_input)
284         logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
285         pnfs = {}
286         for pnf in pnfs_in_input:
287             for pnfd in pnfs_in_nsd:
288                 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
289                     k = pnfd["pnf_id"]
290                     pnf["nsInstances"] = self.ns_inst_id
291                     pnfs[k] = {
292                         "type": "CreatePnf",
293                         "input": {
294                             "content": pnf
295                         }
296                     }
297         return pnfs