Merge "Fix bug for ns inst"
[vfc/nfvo/lcm.git] / lcm / ns / biz / ns_instant.py
1 # Copyright 2016-2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import time
17 import traceback
18 import uuid
19 from threading import Thread
20
21 from rest_framework import status
22
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
38
39 logger = logging.getLogger(__name__)
40
41
42 class BuildInWorkflowThread(Thread):
43     def __init__(self, plan_input, occ_id):
44         Thread.__init__(self)
45         self.plan_input = plan_input
46         self.occ_id = occ_id
47
48     def run(self):
49         build_in.run_ns_instantiate(self.plan_input, self.occ_id)
50
51
52 class InstantNSService(object):
53     def __init__(self, ns_inst_id, plan_content):
54         self.ns_inst_id = ns_inst_id
55         self.req_data = plan_content
56
57     def do_biz(self):
58         job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
59         occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
60
61         try:
62             logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
63             logger.debug('req_data=%s' % self.req_data)
64             ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
65             vim_id = {}
66
67             input_parameters = []
68             if 'additionalParamForNs' in self.req_data:
69                 for key, val in self.req_data['additionalParamForNs'].items():
70                     input_parameters.append({"key": key, "value": val})
71
72                 if 'location' in self.req_data['additionalParamForNs']:
73                     cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
74                     cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
75                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
76                 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
77             else:
78                 params_json = json.JSONEncoder().encode({})
79
80             location_constraints = []
81             if 'locationConstraints' in self.req_data:
82                 location_constraints = self.req_data['locationConstraints']
83
84             JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
85             dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
86             logger.debug('tosca plan dest: %s' % dst_plan)
87             logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
88             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
89
90             params_vnf = []
91             plan_dict = json.JSONDecoder().decode(dst_plan)
92             for vnf in ignore_case_get(plan_dict, "vnfs"):
93                 vnfd_id = vnf['properties']['id']
94                 vnfm_type_temp = vnf['properties'].get("vnfm_info", "undefined")
95                 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type_temp)
96                 vnfm_type = vnfm_type_temp
97                 if isinstance(vnfm_type_temp, list):
98                     vnfm_type = vnfm_type_temp[0]
99                 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
100                 s_vimid = vimid
101                 if isinstance(vimid, dict):
102                     s_vimid = "%s_%s" % (vimid["cloud_owner"], vimid["cloud_regionid"])
103                 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=s_vimid)
104
105                 params_vnf.append({
106                     "vnfProfileId": vnf["vnf_id"],
107                     "additionalParam": {
108                         "properties": json.JSONEncoder().encode(vnf['properties']),
109                         "vimId": vimid,
110                         "vnfmInstanceId": vnfm_info["vnfmId"],
111                         "vnfmType": vnfm_type,
112                         "inputs": params_json
113                     }
114                 })
115
116             self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
117             dst_plan = json.JSONEncoder().encode(plan_dict)
118             logger.debug('tosca plan dest add vimid:%s' % dst_plan)
119             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
120
121             pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
122
123             vnf_params_json = json.JSONEncoder().encode(params_vnf)
124             plan_input = {
125                 'jobId': job_id,
126                 'nsInstanceId': self.ns_inst_id,
127                 'object_context': dst_plan,
128                 'object_additionalParamForNs': params_json,
129                 'object_additionalParamForVnf': vnf_params_json,
130                 'object_additionalParamForPnf': pnf_params_json
131             }
132             plan_input.update(**self.get_model_count(dst_plan))
133
134             if 'additionalParamForNs' in self.req_data:
135                 plan_input["sdnControllerId"] = ignore_case_get(
136                     self.req_data['additionalParamForNs'], "sdncontroller")
137
138             ServiceBaseInfoModel(service_id=self.ns_inst_id,
139                                  service_name=ns_inst.name,
140                                  service_type='NFVO',
141                                  description=ns_inst.description,
142                                  active_status='--',
143                                  status=ns_inst.status,
144                                  creator='--',
145                                  create_time=int(time.time() * 1000)).save()
146
147             if config.WORKFLOW_OPTION == "wso2":
148                 service_tpl = get_servicetemplate(ns_inst.nsd_id)
149                 DefPkgMappingModel(service_id=self.ns_inst_id,
150                                    service_def_id=service_tpl['csarId'],
151                                    template_name=service_tpl['templateName'],
152                                    template_id=service_tpl['serviceTemplateId']).save()
153
154                 for key, val in self.req_data['additionalParamForNs'].items():
155                     InputParamMappingModel(service_id=self.ns_inst_id,
156                                            input_key=key,
157                                            input_value=val).save()
158
159                 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
160                     VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
161                                    vnffginstid=str(uuid.uuid4()),
162                                    nsinstid=self.ns_inst_id,
163                                    endpointnumber=0).save()
164             else:
165                 # TODO
166                 pass
167             logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
168             if config.WORKFLOW_OPTION == "wso2":
169                 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
170             elif config.WORKFLOW_OPTION == "activiti":
171                 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
172             elif config.WORKFLOW_OPTION == "grapflow":
173                 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
174             else:
175                 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
176
177         except Exception as e:
178             logger.error(traceback.format_exc())
179             logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
180             NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.message)
181             JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
182             return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
183
184     def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
185         # todo occ_id
186         servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
187         process_id = get_process_id('init', servicetemplate_id)
188         data = {"processId": process_id, "params": {"planInput": plan_input}}
189         logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
190
191         ret = workflow_run(data)
192         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
193         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
194             self.ns_inst_id, ret.get('status')))
195         if ret.get('status') == 1:
196             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
197         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
198
199     def start_activiti_workflow(self, job_id, plan_input, occ_id):
200         # todo occ_id
201         plans = WFPlanModel.objects.filter()
202         if not plans:
203             raise NSLCMException("No plan is found, you should deploy plan first!")
204         data = {
205             "processId": plans[0].process_id,
206             "params": plan_input
207         }
208         ret = activiti.exec_workflow(data)
209         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
210         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
211             self.ns_inst_id, ret.get('status')))
212         if ret.get('status') == 1:
213             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
214         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
215
216     def start_buildin_workflow(self, job_id, plan_input, occ_id):
217         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
218         BuildInWorkflowThread(plan_input, occ_id).start()
219         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
220
221     def start_buildin_grapflow(self, job_id, plan_input, occ_id):
222         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
223         run_ns_instantiate(plan_input, occ_id)
224         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
225
226     @staticmethod
227     def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
228         for location in location_constraints:
229             if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
230                 # if 'vimId' in location['locationConstraints']:
231                 if len(location['locationConstraints']) == 1:
232                     cloud_owner = location['locationConstraints']["vimId"].split('_')[0]
233                     cloud_regionid = location['locationConstraints']["vimId"].split('_')[1]
234                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
235                 elif len(location['locationConstraints']) == 2:
236                     cloud_owner = location['locationConstraints']["cloudOwner"]
237                     cloud_regionid = location['locationConstraints']["cloudRegionId"]
238                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
239                 return vim_id
240         if vim_id:
241             return vim_id
242         raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
243
244     @staticmethod
245     def set_vl_vim_id(vim_id, location_constraints, plan_dict):
246         if "vls" not in plan_dict:
247             logger.debug("No vl is found in nsd.")
248             return
249         vl_vnf = {}
250         for vnf in ignore_case_get(plan_dict, "vnfs"):
251             if "dependencies" in vnf:
252                 for depend in vnf["dependencies"]:
253                     vl_vnf[depend["vl_id"]] = vnf['properties']['id']
254         vnf_vim = {}
255
256         for location in location_constraints:
257             if "vnfProfileId" in location:
258                 vnfd_id = location["vnfProfileId"]
259                 # if 'vimId' in location["locationConstraints"]:
260                 if len(location['locationConstraints']) == 1:
261                     cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
262                     cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
263                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
264                     vnf_vim[vnfd_id] = vim_id
265                 elif len(location['locationConstraints']) == 2:
266                     cloud_owner = location["locationConstraints"]["cloudOwner"]
267                     cloud_regionid = location["locationConstraints"]["cloudRegionId"]
268                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
269                     vnf_vim[vnfd_id] = vim_id
270         for vl in plan_dict["vls"]:
271             vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
272             vimid = ignore_case_get(vnf_vim, vnfdid)
273             if not vimid:
274                 vimid = vim_id
275             if not vimid:
276                 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
277             if "location_info" not in vl["properties"]:
278                 vl["properties"]["location_info"] = {}
279             vl["properties"]["location_info"]["vimid"] = vimid
280
281     @staticmethod
282     def get_model_count(context):
283         data = json.JSONDecoder().decode(context)
284         vls = len(data.get('vls', []))
285         sfcs = len(data.get('fps', []))
286         vnfs = len(data.get('vnfs', []))
287         pnfs = len(data.get('pnfs', []))
288         return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
289
290     def init_pnf_para(self, plan_dict):
291         pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
292         pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
293         logger.debug("addpnfData ; %s" % pnfs_in_input)
294         logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
295         pnfs = {}
296         for pnf in pnfs_in_input:
297             for pnfd in pnfs_in_nsd:
298                 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
299                     k = pnfd["pnf_id"]
300                     pnf["nsInstances"] = self.ns_inst_id
301                     pnfs[k] = {
302                         "type": "CreatePnf",
303                         "input": {
304                             "content": pnf
305                         }
306                     }
307         return pnfs