Split the vim_id in LCM
[vfc/nfvo/lcm.git] / lcm / ns / biz / ns_instant.py
1 # Copyright 2016-2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import time
17 import traceback
18 import uuid
19 from threading import Thread
20
21 from rest_framework import status
22
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
38
39 logger = logging.getLogger(__name__)
40
41
42 class BuildInWorkflowThread(Thread):
43     def __init__(self, plan_input, occ_id):
44         Thread.__init__(self)
45         self.plan_input = plan_input
46         self.occ_id = occ_id
47
48     def run(self):
49         build_in.run_ns_instantiate(self.plan_input, self.occ_id)
50
51
52 class InstantNSService(object):
53     def __init__(self, ns_inst_id, plan_content):
54         self.ns_inst_id = ns_inst_id
55         self.req_data = plan_content
56
57     def do_biz(self):
58         job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
59         occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
60
61         try:
62             logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
63             logger.debug('req_data=%s' % self.req_data)
64             ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
65             vim_id = {}
66
67             # input_parameters = []
68             # if 'additionalParamForNs' in self.req_data:
69             #     for key, val in self.req_data['additionalParamForNs'].items():
70             #         input_parameters.append({"key": key, "value": val})
71             #     if 'location' in self.req_data['additionalParamForNs']:
72             #         vim_id = self.req_data['additionalParamForNs']['location']
73             #     params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
74             # else:
75             #     params_json = json.JSONEncoder().encode({})
76             input_parameters = []
77             if 'additionalParamForNs' in self.req_data:
78                 for key, val in self.req_data['additionalParamForNs'].items():
79                     input_parameters.append({"key": key, "value": val})
80
81                 if 'location' in self.req_data['additionalParamForNs']:
82                     cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
83                     cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
84                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
85                     logger.info('vim_id::::::::::::%s', type(vim_id))
86                 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
87             else:
88                 params_json = json.JSONEncoder().encode({})
89
90             location_constraints = []
91             if 'locationConstraints' in self.req_data:
92                 location_constraints = self.req_data['locationConstraints']
93
94             JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
95             dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
96             logger.debug('tosca plan dest: %s' % dst_plan)
97             logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
98             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
99
100             params_vnf = []
101             plan_dict = json.JSONDecoder().decode(dst_plan)
102             for vnf in ignore_case_get(plan_dict, "vnfs"):
103                 vnfd_id = vnf['properties']['id']
104                 vnfm_type_temp = vnf['properties'].get("vnfm_info", "undefined")
105                 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type_temp)
106                 vnfm_type = vnfm_type_temp
107                 if isinstance(vnfm_type_temp, list):
108                     vnfm_type = vnfm_type_temp[0]
109                 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
110                 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
111
112                 params_vnf.append({
113                     "vnfProfileId": vnf["vnf_id"],
114                     "additionalParam": {
115                         "properties": json.JSONEncoder().encode(vnf['properties']),
116                         "vimId": vimid,
117                         "vnfmInstanceId": vnfm_info["vnfmId"],
118                         "vnfmType": vnfm_type,
119                         "inputs": params_json
120                     }
121                 })
122
123             self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
124             dst_plan = json.JSONEncoder().encode(plan_dict)
125             logger.debug('tosca plan dest add vimid:%s' % dst_plan)
126             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
127
128             pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
129
130             vnf_params_json = json.JSONEncoder().encode(params_vnf)
131             plan_input = {
132                 'jobId': job_id,
133                 'nsInstanceId': self.ns_inst_id,
134                 'object_context': dst_plan,
135                 'object_additionalParamForNs': params_json,
136                 'object_additionalParamForVnf': vnf_params_json,
137                 'object_additionalParamForPnf': pnf_params_json
138             }
139             plan_input.update(**self.get_model_count(dst_plan))
140
141             if 'additionalParamForNs' in self.req_data:
142                 plan_input["sdnControllerId"] = ignore_case_get(
143                     self.req_data['additionalParamForNs'], "sdncontroller")
144
145             ServiceBaseInfoModel(service_id=self.ns_inst_id,
146                                  service_name=ns_inst.name,
147                                  service_type='NFVO',
148                                  description=ns_inst.description,
149                                  active_status='--',
150                                  status=ns_inst.status,
151                                  creator='--',
152                                  create_time=int(time.time() * 1000)).save()
153
154             if config.WORKFLOW_OPTION == "wso2":
155                 service_tpl = get_servicetemplate(ns_inst.nsd_id)
156                 DefPkgMappingModel(service_id=self.ns_inst_id,
157                                    service_def_id=service_tpl['csarId'],
158                                    template_name=service_tpl['templateName'],
159                                    template_id=service_tpl['serviceTemplateId']).save()
160
161                 for key, val in self.req_data['additionalParamForNs'].items():
162                     InputParamMappingModel(service_id=self.ns_inst_id,
163                                            input_key=key,
164                                            input_value=val).save()
165
166                 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
167                     VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
168                                    vnffginstid=str(uuid.uuid4()),
169                                    nsinstid=self.ns_inst_id,
170                                    endpointnumber=0).save()
171             else:
172                 # TODO
173                 pass
174             logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
175             if config.WORKFLOW_OPTION == "wso2":
176                 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
177             elif config.WORKFLOW_OPTION == "activiti":
178                 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
179             elif config.WORKFLOW_OPTION == "grapflow":
180                 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
181             else:
182                 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
183
184         except Exception as e:
185             logger.error(traceback.format_exc())
186             logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
187             NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.message)
188             JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
189             return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
190
191     def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
192         # todo occ_id
193         servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
194         process_id = get_process_id('init', servicetemplate_id)
195         data = {"processId": process_id, "params": {"planInput": plan_input}}
196         logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
197
198         ret = workflow_run(data)
199         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
200         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
201             self.ns_inst_id, ret.get('status')))
202         if ret.get('status') == 1:
203             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
204         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
205
206     def start_activiti_workflow(self, job_id, plan_input, occ_id):
207         # todo occ_id
208         plans = WFPlanModel.objects.filter()
209         if not plans:
210             raise NSLCMException("No plan is found, you should deploy plan first!")
211         data = {
212             "processId": plans[0].process_id,
213             "params": plan_input
214         }
215         ret = activiti.exec_workflow(data)
216         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
217         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
218             self.ns_inst_id, ret.get('status')))
219         if ret.get('status') == 1:
220             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
221         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
222
223     def start_buildin_workflow(self, job_id, plan_input, occ_id):
224         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
225         BuildInWorkflowThread(plan_input, occ_id).start()
226         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
227
228     def start_buildin_grapflow(self, job_id, plan_input, occ_id):
229         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
230         run_ns_instantiate(plan_input, occ_id)
231         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
232
233     # @staticmethod
234     # def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
235     #     for location in location_constraints:
236     #         if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
237     #             return location["locationConstraints"]["vimId"]
238     #     if vim_id:
239     #         return vim_id
240     #     raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
241     @staticmethod
242     def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
243         for location in location_constraints:
244             if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
245                 if 'vimId' in location['locationConstraints']:
246                     cloud_owner = location['locationConstraints']["vimId"].split('_')[0]
247                     cloud_regionid = location['locationConstraints']["vimId"].split('_')[1]
248                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
249                 else:
250                     cloud_owner = location['locationConstraints']["cloudOwner"]
251                     cloud_regionid = location['locationConstraints']["cloudRegionId"]
252                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
253                 return vim_id
254         if vim_id:
255             return vim_id
256         raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
257
258     @staticmethod
259     def set_vl_vim_id(vim_id, location_constraints, plan_dict):
260         if "vls" not in plan_dict:
261             logger.debug("No vl is found in nsd.")
262             return
263         vl_vnf = {}
264         for vnf in ignore_case_get(plan_dict, "vnfs"):
265             if "dependencies" in vnf:
266                 for depend in vnf["dependencies"]:
267                     vl_vnf[depend["vl_id"]] = vnf['properties']['id']
268         vnf_vim = {}
269         # for location in location_constraints:
270         #     if "vnfProfileId" in location:
271         #         vnfd_id = location["vnfProfileId"]
272         #         vnf_vim[vnfd_id] = location["locationConstraints"]["vimId"]
273         for location in location_constraints:
274             if "vnfProfileId" in location:
275                 vnfd_id = location["vnfProfileId"]
276                 if 'vimId' in location["locationConstraints"]:
277                     cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
278                     cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
279                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
280                     vnf_vim[vnfd_id] = vim_id
281                 else:
282                     cloud_owner = location["locationConstraints"]["cloudOwner"]
283                     cloud_regionid = location["locationConstraints"]["cloudRegionId"]
284                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
285                     vnf_vim[vnfd_id] = vim_id
286         for vl in plan_dict["vls"]:
287             vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
288             vimid = ignore_case_get(vnf_vim, vnfdid)
289             if not vimid:
290                 vimid = vim_id
291             if not vimid:
292                 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
293             if "location_info" not in vl["properties"]:
294                 vl["properties"]["location_info"] = {}
295             vl["properties"]["location_info"]["vimid"] = vimid
296
297     @staticmethod
298     def get_model_count(context):
299         data = json.JSONDecoder().decode(context)
300         vls = len(data.get('vls', []))
301         sfcs = len(data.get('fps', []))
302         vnfs = len(data.get('vnfs', []))
303         pnfs = len(data.get('pnfs', []))
304         return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
305
306     def init_pnf_para(self, plan_dict):
307         pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
308         pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
309         logger.debug("addpnfData ; %s" % pnfs_in_input)
310         logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
311         pnfs = {}
312         for pnf in pnfs_in_input:
313             for pnfd in pnfs_in_nsd:
314                 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
315                     k = pnfd["pnf_id"]
316                     pnf["nsInstances"] = self.ns_inst_id
317                     pnfs[k] = {
318                         "type": "CreatePnf",
319                         "input": {
320                             "content": pnf
321                         }
322                     }
323         return pnfs