Add testcase for ns instantiate
[vfc/nfvo/lcm.git] / lcm / ns / biz / ns_instant.py
1 # Copyright 2016-2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import time
17 import traceback
18 import uuid
19 from threading import Thread
20
21 from rest_framework import status
22
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
38
39 logger = logging.getLogger(__name__)
40
41
42 class BuildInWorkflowThread(Thread):
43     def __init__(self, plan_input, occ_id):
44         Thread.__init__(self)
45         self.plan_input = plan_input
46         self.occ_id = occ_id
47
48     def run(self):
49         build_in.run_ns_instantiate(self.plan_input, self.occ_id)
50
51
52 class InstantNSService(object):
53     def __init__(self, ns_inst_id, plan_content):
54         self.ns_inst_id = ns_inst_id
55         self.req_data = plan_content
56
57     def do_biz(self):
58         job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
59         occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
60
61         try:
62             logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
63             logger.debug('req_data=%s' % self.req_data)
64             ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
65             vim_id = {}
66
67             input_parameters = []
68             if 'additionalParamForNs' in self.req_data:
69                 for key, val in self.req_data['additionalParamForNs'].items():
70                     input_parameters.append({"key": key, "value": val})
71
72                 if 'location' in self.req_data['additionalParamForNs']:
73                     cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
74                     cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
75                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
76                 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
77             else:
78                 params_json = json.JSONEncoder().encode({})
79
80             location_constraints = []
81             if 'locationConstraints' in self.req_data:
82                 location_constraints = self.req_data['locationConstraints']
83
84             JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
85             dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
86             logger.debug('tosca plan dest: %s' % dst_plan)
87             logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
88             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
89
90             params_vnf = []
91             plan_dict = json.JSONDecoder().decode(dst_plan)
92             for vnf in ignore_case_get(plan_dict, "vnfs"):
93                 vnfd_id = vnf['properties']['id']
94                 vnfm_type_temp = vnf['properties'].get("vnfm_info", "undefined")
95                 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type_temp)
96                 vnfm_type = vnfm_type_temp
97                 if isinstance(vnfm_type_temp, list):
98                     vnfm_type = vnfm_type_temp[0]
99                 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
100                 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
101
102                 params_vnf.append({
103                     "vnfProfileId": vnf["vnf_id"],
104                     "additionalParam": {
105                         "properties": json.JSONEncoder().encode(vnf['properties']),
106                         "vimId": vimid,
107                         "vnfmInstanceId": vnfm_info["vnfmId"],
108                         "vnfmType": vnfm_type,
109                         "inputs": params_json
110                     }
111                 })
112
113             self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
114             dst_plan = json.JSONEncoder().encode(plan_dict)
115             logger.debug('tosca plan dest add vimid:%s' % dst_plan)
116             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
117
118             pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
119
120             vnf_params_json = json.JSONEncoder().encode(params_vnf)
121             plan_input = {
122                 'jobId': job_id,
123                 'nsInstanceId': self.ns_inst_id,
124                 'object_context': dst_plan,
125                 'object_additionalParamForNs': params_json,
126                 'object_additionalParamForVnf': vnf_params_json,
127                 'object_additionalParamForPnf': pnf_params_json
128             }
129             plan_input.update(**self.get_model_count(dst_plan))
130
131             if 'additionalParamForNs' in self.req_data:
132                 plan_input["sdnControllerId"] = ignore_case_get(
133                     self.req_data['additionalParamForNs'], "sdncontroller")
134
135             ServiceBaseInfoModel(service_id=self.ns_inst_id,
136                                  service_name=ns_inst.name,
137                                  service_type='NFVO',
138                                  description=ns_inst.description,
139                                  active_status='--',
140                                  status=ns_inst.status,
141                                  creator='--',
142                                  create_time=int(time.time() * 1000)).save()
143
144             if config.WORKFLOW_OPTION == "wso2":
145                 service_tpl = get_servicetemplate(ns_inst.nsd_id)
146                 DefPkgMappingModel(service_id=self.ns_inst_id,
147                                    service_def_id=service_tpl['csarId'],
148                                    template_name=service_tpl['templateName'],
149                                    template_id=service_tpl['serviceTemplateId']).save()
150
151                 for key, val in self.req_data['additionalParamForNs'].items():
152                     InputParamMappingModel(service_id=self.ns_inst_id,
153                                            input_key=key,
154                                            input_value=val).save()
155
156                 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
157                     VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
158                                    vnffginstid=str(uuid.uuid4()),
159                                    nsinstid=self.ns_inst_id,
160                                    endpointnumber=0).save()
161             else:
162                 # TODO
163                 pass
164             logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
165             if config.WORKFLOW_OPTION == "wso2":
166                 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
167             elif config.WORKFLOW_OPTION == "activiti":
168                 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
169             elif config.WORKFLOW_OPTION == "grapflow":
170                 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
171             else:
172                 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
173
174         except Exception as e:
175             logger.error(traceback.format_exc())
176             logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
177             NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.message)
178             JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
179             return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
180
181     def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
182         # todo occ_id
183         servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
184         process_id = get_process_id('init', servicetemplate_id)
185         data = {"processId": process_id, "params": {"planInput": plan_input}}
186         logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
187
188         ret = workflow_run(data)
189         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
190         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
191             self.ns_inst_id, ret.get('status')))
192         if ret.get('status') == 1:
193             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
194         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
195
196     def start_activiti_workflow(self, job_id, plan_input, occ_id):
197         # todo occ_id
198         plans = WFPlanModel.objects.filter()
199         if not plans:
200             raise NSLCMException("No plan is found, you should deploy plan first!")
201         data = {
202             "processId": plans[0].process_id,
203             "params": plan_input
204         }
205         ret = activiti.exec_workflow(data)
206         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
207         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
208             self.ns_inst_id, ret.get('status')))
209         if ret.get('status') == 1:
210             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
211         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
212
213     def start_buildin_workflow(self, job_id, plan_input, occ_id):
214         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
215         BuildInWorkflowThread(plan_input, occ_id).start()
216         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
217
218     def start_buildin_grapflow(self, job_id, plan_input, occ_id):
219         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
220         run_ns_instantiate(plan_input, occ_id)
221         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
222
223     @staticmethod
224     def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
225         for location in location_constraints:
226             if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
227                 # if 'vimId' in location['locationConstraints']:
228                 if len(location['locationConstraints']) == 1:
229                     cloud_owner = location['locationConstraints']["vimId"].split('_')[0]
230                     cloud_regionid = location['locationConstraints']["vimId"].split('_')[1]
231                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
232                 elif len(location['locationConstraints']) == 2:
233                     cloud_owner = location['locationConstraints']["cloudOwner"]
234                     cloud_regionid = location['locationConstraints']["cloudRegionId"]
235                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
236                 return vim_id
237         if vim_id:
238             return vim_id
239         raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
240
241     @staticmethod
242     def set_vl_vim_id(vim_id, location_constraints, plan_dict):
243         if "vls" not in plan_dict:
244             logger.debug("No vl is found in nsd.")
245             return
246         vl_vnf = {}
247         for vnf in ignore_case_get(plan_dict, "vnfs"):
248             if "dependencies" in vnf:
249                 for depend in vnf["dependencies"]:
250                     vl_vnf[depend["vl_id"]] = vnf['properties']['id']
251         vnf_vim = {}
252
253         for location in location_constraints:
254             if "vnfProfileId" in location:
255                 vnfd_id = location["vnfProfileId"]
256                 # if 'vimId' in location["locationConstraints"]:
257                 if len(location['locationConstraints']) == 1:
258                     cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
259                     cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
260                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
261                     vnf_vim[vnfd_id] = vim_id
262                 elif len(location['locationConstraints']) == 2:
263                     cloud_owner = location["locationConstraints"]["cloudOwner"]
264                     cloud_regionid = location["locationConstraints"]["cloudRegionId"]
265                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
266                     vnf_vim[vnfd_id] = vim_id
267         for vl in plan_dict["vls"]:
268             vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
269             vimid = ignore_case_get(vnf_vim, vnfdid)
270             if not vimid:
271                 vimid = vim_id
272             if not vimid:
273                 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
274             if "location_info" not in vl["properties"]:
275                 vl["properties"]["location_info"] = {}
276             vl["properties"]["location_info"]["vimid"] = vimid
277
278     @staticmethod
279     def get_model_count(context):
280         data = json.JSONDecoder().decode(context)
281         vls = len(data.get('vls', []))
282         sfcs = len(data.get('fps', []))
283         vnfs = len(data.get('vnfs', []))
284         pnfs = len(data.get('pnfs', []))
285         return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
286
287     def init_pnf_para(self, plan_dict):
288         pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
289         pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
290         logger.debug("addpnfData ; %s" % pnfs_in_input)
291         logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
292         pnfs = {}
293         for pnf in pnfs_in_input:
294             for pnfd in pnfs_in_nsd:
295                 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
296                     k = pnfd["pnf_id"]
297                     pnf["nsInstances"] = self.ns_inst_id
298                     pnfs[k] = {
299                         "type": "CreatePnf",
300                         "input": {
301                             "content": pnf
302                         }
303                     }
304         return pnfs