add comments
[vfc/nfvo/lcm.git] / lcm / ns / biz / ns_instant.py
1 # Copyright 2016-2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import time
17 import traceback
18 import uuid
19 from threading import Thread
20
21 from rest_framework import status
22
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
38 from lcm.ns.enum import NS_INST_STATUS
39
40 logger = logging.getLogger(__name__)
41
42
43 class BuildInWorkflowThread(Thread):
44     def __init__(self, plan_input, occ_id):
45         Thread.__init__(self)
46         self.plan_input = plan_input
47         self.occ_id = occ_id
48
49     def run(self):
50         build_in.run_ns_instantiate(self.plan_input, self.occ_id)
51
52
53 class InstantNSService(object):
54     """
55     Instantiate NS
56     """
57
58     def __init__(self, ns_inst_id, plan_content):
59         self.ns_inst_id = ns_inst_id
60         self.req_data = plan_content
61
62     def do_biz(self):
63         job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
64         occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
65         NSInstModel.objects.filter(id=self.ns_inst_id).update(status=NS_INST_STATUS.INSTANTIATING)
66
67         try:
68             logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
69             logger.debug('req_data=%s' % self.req_data)
70             ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
71             vim_id = {}
72
73             input_parameters = []
74             if 'additionalParamForNs' in self.req_data:
75                 for key, val in list(self.req_data['additionalParamForNs'].items()):
76                     input_parameters.append({"key": key, "value": val})
77
78                 if 'location' in self.req_data['additionalParamForNs']:
79                     cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
80                     cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
81                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
82                 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
83             else:
84                 params_json = json.JSONEncoder().encode({})
85
86             location_constraints = [] if not self.req_data.get('locationConstraints') \
87                 else self.req_data['locationConstraints']
88             vnf_vim = self.get_vnf_vim_info(location_constraints)
89
90             JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
91             dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
92             logger.debug('tosca plan dest: %s' % dst_plan)
93             logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
94             sdc_run_catalog.modify_nsd_state(ns_inst.nspackage_id)
95             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
96
97             params_vnf = []
98             plan_dict = json.JSONDecoder().decode(dst_plan)
99             for vnf in ignore_case_get(plan_dict, "vnfs"):
100                 vnfd_id = vnf['properties']['id']
101                 vnfm_type = vnf['properties'].get("vnfm_info", "undefined")
102                 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type)
103                 if isinstance(vnfm_type, list):
104                     vnfm_type = vnfm_type[0]
105                 vimid = self.get_vnf_vim_id(vim_id, vnf_vim, vnfd_id)
106                 s_vimid = "%s_%s" % (vimid["cloud_owner"], vimid["cloud_regionid"])
107                 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=s_vimid)
108
109                 params_vnf.append({
110                     "vnfProfileId": vnf["vnf_id"],
111                     "additionalParam": {
112                         "properties": json.JSONEncoder().encode(vnf['properties']),
113                         "vimId": vimid,
114                         "vnfmInstanceId": vnfm_info["vnfmId"],
115                         "vnfmType": vnfm_type,
116                         "inputs": params_json
117                     }
118                 })
119
120             self.set_vl_vim_id(vim_id, vnf_vim, plan_dict)
121             dst_plan = json.JSONEncoder().encode(plan_dict)
122             logger.debug('tosca plan dest add vimid:%s' % dst_plan)
123             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
124
125             pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
126
127             vnf_params_json = json.JSONEncoder().encode(params_vnf)
128             plan_input = {
129                 'jobId': job_id,
130                 'nsInstanceId': self.ns_inst_id,
131                 'object_context': dst_plan,
132                 'object_additionalParamForNs': params_json,
133                 'object_additionalParamForVnf': vnf_params_json,
134                 'object_additionalParamForPnf': pnf_params_json
135             }
136             plan_input.update(**self.get_model_count(dst_plan))
137
138             if 'additionalParamForNs' in self.req_data:
139                 plan_input["sdnControllerId"] = ignore_case_get(
140                     self.req_data['additionalParamForNs'], "sdncontroller")
141
142             ServiceBaseInfoModel(service_id=self.ns_inst_id,
143                                  service_name=ns_inst.name,
144                                  service_type='NFVO',
145                                  description=ns_inst.description,
146                                  active_status='--',
147                                  status=ns_inst.status,
148                                  creator='--',
149                                  create_time=int(time.time() * 1000)).save()
150
151             if config.WORKFLOW_OPTION == "wso2":
152                 service_tpl = get_servicetemplate(ns_inst.nsd_id)
153                 DefPkgMappingModel(service_id=self.ns_inst_id,
154                                    service_def_id=service_tpl['csarId'],
155                                    template_name=service_tpl['templateName'],
156                                    template_id=service_tpl['serviceTemplateId']).save()
157
158                 for key, val in list(self.req_data['additionalParamForNs'].items()):
159                     InputParamMappingModel(service_id=self.ns_inst_id,
160                                            input_key=key,
161                                            input_value=val).save()
162
163                 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
164                     VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
165                                    vnffginstid=str(uuid.uuid4()),
166                                    nsinstid=self.ns_inst_id,
167                                    endpointnumber=0).save()
168             else:
169                 # TODO
170                 pass
171             logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
172             if config.WORKFLOW_OPTION == "wso2":
173                 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
174             elif config.WORKFLOW_OPTION == "activiti":
175                 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
176             elif config.WORKFLOW_OPTION == "grapflow":
177                 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
178             else:
179                 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
180
181         except Exception as e:
182             logger.error(traceback.format_exc())
183             logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.args[0]))
184             NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.args[0])
185             JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
186             build_in.post_deal(self.ns_inst_id, "false")
187             return dict(data={'error': e.args[0]}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
188
189     def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
190         """
191         Start WSO2 workflow
192         :param job_id:
193         :param ns_inst:
194         :param plan_input:
195         :param occ_id:
196         :return:
197         """
198         # todo occ_id
199         servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
200         process_id = get_process_id('init', servicetemplate_id)
201         data = {"processId": process_id, "params": {"planInput": plan_input}}
202         logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
203
204         ret = workflow_run(data)
205         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
206         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
207             self.ns_inst_id, ret.get('status')))
208         if ret.get('status') == 1:
209             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
210         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
211
212     def start_activiti_workflow(self, job_id, plan_input, occ_id):
213         """
214         Start activiti workflow
215         :param job_id:
216         :param plan_input:
217         :param occ_id:
218         :return:
219         """
220         # todo occ_id
221         plans = WFPlanModel.objects.filter()
222         if not plans:
223             raise NSLCMException("No plan is found, you should deploy plan first!")
224         data = {
225             "processId": plans[0].process_id,
226             "params": plan_input
227         }
228         ret = activiti.exec_workflow(data)
229         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
230         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
231             self.ns_inst_id, ret.get('status')))
232         if ret.get('status') == 1:
233             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
234         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
235
236     def start_buildin_workflow(self, job_id, plan_input, occ_id):
237         """
238         Start buildin workflow
239         :param job_id:
240         :param plan_input:
241         :param occ_id:
242         :return:
243         """
244         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
245         BuildInWorkflowThread(plan_input, occ_id).start()
246         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
247
248     def start_buildin_grapflow(self, job_id, plan_input, occ_id):
249         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
250         run_ns_instantiate(plan_input, occ_id)
251         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
252
253     @staticmethod
254     def get_vnf_vim_id(vim_id, vnf_vim, vnfdid):
255         new_vim_id = vnf_vim.get(vnfdid) if vnf_vim.get(vnfdid) else vim_id
256         if new_vim_id:
257             return new_vim_id
258         raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
259
260     @staticmethod
261     def get_vnf_vim_info(location_constraints):
262         vnf_vim = {}
263         for location in location_constraints:
264             if "vnfProfileId" in location:
265                 vnfd_id = location["vnfProfileId"]
266                 if len(location['locationConstraints']) == 1:
267                     cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
268                     cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
269                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
270                     vnf_vim[vnfd_id] = vim_id
271                 elif len(location['locationConstraints']) == 2:
272                     cloud_owner = location["locationConstraints"]["cloudOwner"]
273                     cloud_regionid = location["locationConstraints"]["cloudRegionId"]
274                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
275                     vnf_vim[vnfd_id] = vim_id
276         return vnf_vim
277
278     @staticmethod
279     def set_vl_vim_id(vim_id, vnf_vim, plan_dict):
280         if "vls" not in plan_dict:
281             logger.debug("No vl is found in nsd.")
282             return
283         vl_vnf = {}
284         for vnf in ignore_case_get(plan_dict, "vnfs"):
285             if "dependencies" in vnf:
286                 for depend in vnf["dependencies"]:
287                     vl_vnf[depend["vl_id"]] = vnf['properties']['id']
288         for vl in plan_dict["vls"]:
289             vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
290             vimid = ignore_case_get(vnf_vim, vnfdid)
291             if not vimid:
292                 vimid = vim_id
293             if "location_info" not in vl["properties"]:
294                 vl["properties"]["location_info"] = {}
295             vl["properties"]["location_info"]["vimid"] = vimid
296
297     @staticmethod
298     def get_model_count(context):
299         data = json.JSONDecoder().decode(context)
300         vls = len(data.get('vls', []))
301         sfcs = len(data.get('fps', []))
302         vnfs = len(data.get('vnfs', []))
303         pnfs = len(data.get('pnfs', []))
304         return {'vlCount': vls, 'sfcCount': sfcs, 'vnfCount': vnfs, 'pnfCount': pnfs}
305
306     def init_pnf_para(self, plan_dict):
307         pnfs_in_input = ignore_case_get(self.req_data, "addpnfData", [])
308         pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs", [])
309         logger.debug("addpnfData ; %s" % pnfs_in_input)
310         logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
311         pnfs = {}
312         for pnf in pnfs_in_input:
313             for pnfd in pnfs_in_nsd:
314                 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
315                     k = pnfd["pnf_id"]
316                     pnf["nsInstances"] = self.ns_inst_id
317                     pnfs[k] = {
318                         "type": "CreatePnf",
319                         "input": {
320                             "content": pnf
321                         }
322                     }
323         return pnfs