1f72b329e84873dcc986fa5340ba20279ac1d9d6
[vfc/nfvo/lcm.git] / lcm / ns / biz / ns_instant.py
1 # Copyright 2016-2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import time
17 import traceback
18 import uuid
19 from threading import Thread
20
21 from rest_framework import status
22
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37
38 logger = logging.getLogger(__name__)
39
40
41 class BuildInWorkflowThread(Thread):
42     def __init__(self, plan_input):
43         Thread.__init__(self)
44         self.plan_input = plan_input
45
46     def run(self):
47         build_in.run_ns_instantiate(self.plan_input)
48
49
50 class InstantNSService(object):
51     def __init__(self, ns_inst_id, plan_content):
52         self.ns_inst_id = ns_inst_id
53         self.req_data = plan_content
54
55     def do_biz(self):
56         try:
57             job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
58             logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
59             logger.debug('req_data=%s' % self.req_data)
60             ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
61             vim_id = ''
62
63             input_parameters = []
64             if 'additionalParamForNs' in self.req_data:
65                 for key, val in self.req_data['additionalParamForNs'].items():
66                     input_parameters.append({"key": key, "value": val})
67                 if 'location' in self.req_data['additionalParamForNs']:
68                     vim_id = self.req_data['additionalParamForNs']['location']
69                 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
70             else:
71                 params_json = json.JSONEncoder().encode({})
72
73             location_constraints = []
74             if 'locationConstraints' in self.req_data:
75                 location_constraints = self.req_data['locationConstraints']
76
77             JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
78             dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
79             logger.debug('tosca plan dest: %s' % dst_plan)
80             logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
81             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
82
83             params_vnf = []
84             plan_dict = json.JSONDecoder().decode(dst_plan)
85             for vnf in ignore_case_get(plan_dict, "vnfs"):
86                 vnfd_id = vnf['properties']['id']
87                 vnfm_type_temp = vnf['properties'].get("vnfm_info", "undefined")
88                 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type_temp)
89                 if vnfm_type_temp != "undefined":
90                     if isinstance(vnfm_type_temp, list):
91                         vnfm_type = vnfm_type_temp[0]
92                     if isinstance(vnfm_type_temp, str):
93                         vnfm_type = vnfm_type_temp
94                 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
95                 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
96
97                 params_vnf.append({
98                     "vnfProfileId": vnf["vnf_id"],
99                     "additionalParam": {
100                         "properties": json.JSONEncoder().encode(vnf['properties']),
101                         "vimId": vimid,
102                         "vnfmInstanceId": vnfm_info["vnfmId"],
103                         "vnfmType": vnfm_type,
104                         "inputs": params_json
105                     }
106                 })
107
108             self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
109             dst_plan = json.JSONEncoder().encode(plan_dict)
110             logger.debug('tosca plan dest add vimid:%s' % dst_plan)
111             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
112
113             pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
114
115             vnf_params_json = json.JSONEncoder().encode(params_vnf)
116             plan_input = {
117                 'jobId': job_id,
118                 'nsInstanceId': self.ns_inst_id,
119                 'object_context': dst_plan,
120                 'object_additionalParamForNs': params_json,
121                 'object_additionalParamForVnf': vnf_params_json,
122                 'object_additionalParamForPnf': pnf_params_json
123             }
124             plan_input.update(**self.get_model_count(dst_plan))
125
126             if 'additionalParamForNs' in self.req_data:
127                 plan_input["sdnControllerId"] = ignore_case_get(
128                     self.req_data['additionalParamForNs'], "sdncontroller")
129
130             ServiceBaseInfoModel(service_id=self.ns_inst_id,
131                                  service_name=ns_inst.name,
132                                  service_type='NFVO',
133                                  description=ns_inst.description,
134                                  active_status='--',
135                                  status=ns_inst.status,
136                                  creator='--',
137                                  create_time=int(time.time() * 1000)).save()
138
139             if config.WORKFLOW_OPTION == "wso2":
140                 service_tpl = get_servicetemplate(ns_inst.nsd_id)
141                 DefPkgMappingModel(service_id=self.ns_inst_id,
142                                    service_def_id=service_tpl['csarId'],
143                                    template_name=service_tpl['templateName'],
144                                    template_id=service_tpl['serviceTemplateId']).save()
145
146                 for key, val in self.req_data['additionalParamForNs'].items():
147                     InputParamMappingModel(service_id=self.ns_inst_id,
148                                            input_key=key,
149                                            input_value=val).save()
150
151                 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
152                     VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
153                                    vnffginstid=str(uuid.uuid4()),
154                                    nsinstid=self.ns_inst_id,
155                                    endpointnumber=0).save()
156             else:
157                 # TODO:
158                 pass
159             logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
160             if config.WORKFLOW_OPTION == "wso2":
161                 return self.start_wso2_workflow(job_id, ns_inst, plan_input)
162             elif config.WORKFLOW_OPTION == "activiti":
163                 return self.start_activiti_workflow()
164             elif config.WORKFLOW_OPTION == "grapflow":
165                 return self.start_buildin_grapflow(job_id, plan_input)
166             else:
167                 return self.start_buildin_workflow(job_id, plan_input)
168
169         except Exception as e:
170             logger.error(traceback.format_exc())
171             logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
172             JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
173             return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
174
175     def start_wso2_workflow(self, job_id, ns_inst, plan_input):
176         servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
177         process_id = get_process_id('init', servicetemplate_id)
178         data = {"processId": process_id, "params": {"planInput": plan_input}}
179         logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
180
181         ret = workflow_run(data)
182         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
183         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
184             self.ns_inst_id, ret.get('status')))
185         if ret.get('status') == 1:
186             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
187         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
188
189     def start_activiti_workflow(self, job_id, plan_input):
190         plans = WFPlanModel.objects.filter()
191         if not plans:
192             raise NSLCMException("No plan is found, you should deploy plan first!")
193         data = {
194             "processId": plans[0].process_id,
195             "params": plan_input
196         }
197         ret = activiti.exec_workflow(data)
198         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
199         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
200             self.ns_inst_id, ret.get('status')))
201         if ret.get('status') == 1:
202             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
203         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
204
205     def start_buildin_workflow(self, job_id, plan_input):
206         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
207         BuildInWorkflowThread(plan_input).start()
208         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
209
210     def start_buildin_grapflow(self, job_id, plan_input):
211         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
212         run_ns_instantiate(plan_input)
213         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
214
215     @staticmethod
216     def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
217         for location in location_constraints:
218             if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
219                 return location["locationConstraints"]["vimId"]
220         if vim_id:
221             return vim_id
222         raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
223
224     @staticmethod
225     def set_vl_vim_id(vim_id, location_constraints, plan_dict):
226         if "vls" not in plan_dict:
227             logger.debug("No vl is found in nsd.")
228             return
229         vl_vnf = {}
230         for vnf in ignore_case_get(plan_dict, "vnfs"):
231             if "dependencies" in vnf:
232                 for depend in vnf["dependencies"]:
233                     vl_vnf[depend["vl_id"]] = vnf['properties']['id']
234         vnf_vim = {}
235         for location in location_constraints:
236             if "vnfProfileId" in location:
237                 vnfd_id = location["vnfProfileId"]
238                 vnf_vim[vnfd_id] = location["locationConstraints"]["vimId"]
239         for vl in plan_dict["vls"]:
240             vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
241             vimid = ignore_case_get(vnf_vim, vnfdid)
242             if not vimid:
243                 vimid = vim_id
244             if not vimid:
245                 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
246             if "location_info" not in vl["properties"]:
247                 vl["properties"]["location_info"] = {}
248             vl["properties"]["location_info"]["vimid"] = vimid
249
250     @staticmethod
251     def get_model_count(context):
252         data = json.JSONDecoder().decode(context)
253         vls = len(data.get('vls', []))
254         sfcs = len(data.get('fps', []))
255         vnfs = len(data.get('vnfs', []))
256         pnfs = len(data.get('pnfs', []))
257         return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
258
259     def init_pnf_para(self, plan_dict):
260         pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
261         pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
262         logger.debug("addpnfData ; %s" % pnfs_in_input)
263         logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
264         pnfs = {}
265         for pnf in pnfs_in_input:
266             for pnfd in pnfs_in_nsd:
267                 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
268                     k = pnfd["pnf_id"]
269                     pnf["nsInstances"] = self.ns_inst_id
270                     # todo pnf["pnfdInfoId"]
271                     pnfs[k] = {
272                         "type": "CreatePnf",
273                         "input": {
274                             "content": pnf
275                         }
276                     }
277         return pnfs