modify ns instance
[vfc/nfvo/lcm.git] / lcm / ns / biz / ns_instant.py
1 # Copyright 2016-2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import time
17 import traceback
18 import uuid
19 from threading import Thread
20
21 from rest_framework import status
22
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37 from lcm.ns.biz.ns_lcm_op_occ import NsLcmOpOcc
38 from lcm.ns.enum import NS_INST_STATUS
39
40 logger = logging.getLogger(__name__)
41
42
43 class BuildInWorkflowThread(Thread):
44     def __init__(self, plan_input, occ_id):
45         Thread.__init__(self)
46         self.plan_input = plan_input
47         self.occ_id = occ_id
48
49     def run(self):
50         build_in.run_ns_instantiate(self.plan_input, self.occ_id)
51
52
53 class InstantNSService(object):
54     def __init__(self, ns_inst_id, plan_content):
55         self.ns_inst_id = ns_inst_id
56         self.req_data = plan_content
57
58     def do_biz(self):
59         job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
60         occ_id = NsLcmOpOcc.create(self.ns_inst_id, "INSTANTIATE", "PROCESSING", False, self.req_data)
61         NSInstModel.objects.filter(id=self.ns_inst_id).update(status=NS_INST_STATUS.INSTANTIATING)
62
63         try:
64             logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
65             logger.debug('req_data=%s' % self.req_data)
66             ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
67             vim_id = {}
68
69             input_parameters = []
70             if 'additionalParamForNs' in self.req_data:
71                 for key, val in list(self.req_data['additionalParamForNs'].items()):
72                     input_parameters.append({"key": key, "value": val})
73
74                 if 'location' in self.req_data['additionalParamForNs']:
75                     cloud_owner = self.req_data['additionalParamForNs']['location'].split('_')[0]
76                     cloud_regionid = self.req_data["additionalParamForNs"]["location"].split('_')[1]
77                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
78                 params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
79             else:
80                 params_json = json.JSONEncoder().encode({})
81
82             location_constraints = [] if not self.req_data.get('locationConstraints')\
83                 else self.req_data['locationConstraints']
84             vnf_vim = self.get_vnf_vim_info(location_constraints)
85
86             JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
87             dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
88             logger.debug('tosca plan dest: %s' % dst_plan)
89             logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
90             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
91
92             params_vnf = []
93             plan_dict = json.JSONDecoder().decode(dst_plan)
94             for vnf in ignore_case_get(plan_dict, "vnfs"):
95                 vnfd_id = vnf['properties']['id']
96                 vnfm_type = vnf['properties'].get("vnfm_info", "undefined")
97                 logger.debug("vnfd_id: %s, vnfm_type : %s", vnfd_id, vnfm_type)
98                 if isinstance(vnfm_type, list):
99                     vnfm_type = vnfm_type[0]
100                 vimid = self.get_vnf_vim_id(vim_id, vnf_vim, vnfd_id)
101                 s_vimid = "%s_%s" % (vimid["cloud_owner"], vimid["cloud_regionid"])
102                 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=s_vimid)
103
104                 params_vnf.append({
105                     "vnfProfileId": vnf["vnf_id"],
106                     "additionalParam": {
107                         "properties": json.JSONEncoder().encode(vnf['properties']),
108                         "vimId": vimid,
109                         "vnfmInstanceId": vnfm_info["vnfmId"],
110                         "vnfmType": vnfm_type,
111                         "inputs": params_json
112                     }
113                 })
114
115             self.set_vl_vim_id(vim_id, vnf_vim, plan_dict)
116             dst_plan = json.JSONEncoder().encode(plan_dict)
117             logger.debug('tosca plan dest add vimid:%s' % dst_plan)
118             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
119
120             pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
121
122             vnf_params_json = json.JSONEncoder().encode(params_vnf)
123             plan_input = {
124                 'jobId': job_id,
125                 'nsInstanceId': self.ns_inst_id,
126                 'object_context': dst_plan,
127                 'object_additionalParamForNs': params_json,
128                 'object_additionalParamForVnf': vnf_params_json,
129                 'object_additionalParamForPnf': pnf_params_json
130             }
131             plan_input.update(**self.get_model_count(dst_plan))
132
133             if 'additionalParamForNs' in self.req_data:
134                 plan_input["sdnControllerId"] = ignore_case_get(
135                     self.req_data['additionalParamForNs'], "sdncontroller")
136
137             ServiceBaseInfoModel(service_id=self.ns_inst_id,
138                                  service_name=ns_inst.name,
139                                  service_type='NFVO',
140                                  description=ns_inst.description,
141                                  active_status='--',
142                                  status=ns_inst.status,
143                                  creator='--',
144                                  create_time=int(time.time() * 1000)).save()
145
146             if config.WORKFLOW_OPTION == "wso2":
147                 service_tpl = get_servicetemplate(ns_inst.nsd_id)
148                 DefPkgMappingModel(service_id=self.ns_inst_id,
149                                    service_def_id=service_tpl['csarId'],
150                                    template_name=service_tpl['templateName'],
151                                    template_id=service_tpl['serviceTemplateId']).save()
152
153                 for key, val in list(self.req_data['additionalParamForNs'].items()):
154                     InputParamMappingModel(service_id=self.ns_inst_id,
155                                            input_key=key,
156                                            input_value=val).save()
157
158                 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
159                     VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
160                                    vnffginstid=str(uuid.uuid4()),
161                                    nsinstid=self.ns_inst_id,
162                                    endpointnumber=0).save()
163             else:
164                 # TODO
165                 pass
166             logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
167             if config.WORKFLOW_OPTION == "wso2":
168                 return self.start_wso2_workflow(job_id, ns_inst, plan_input, occ_id=occ_id)
169             elif config.WORKFLOW_OPTION == "activiti":
170                 return self.start_activiti_workflow(job_id, plan_input, occ_id=occ_id)
171             elif config.WORKFLOW_OPTION == "grapflow":
172                 return self.start_buildin_grapflow(job_id, plan_input, occ_id=occ_id)
173             else:
174                 return self.start_buildin_workflow(job_id, plan_input, occ_id=occ_id)
175
176         except Exception as e:
177             logger.error(traceback.format_exc())
178             logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.args[0]))
179             NsLcmOpOcc.update(occ_id, operationState="FAILED", error=e.args[0])
180             JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
181             build_in.post_deal(self.ns_inst_id, "false")
182             return dict(data={'error': e.args[0]}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
183
184     def start_wso2_workflow(self, job_id, ns_inst, plan_input, occ_id):
185         # todo occ_id
186         servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
187         process_id = get_process_id('init', servicetemplate_id)
188         data = {"processId": process_id, "params": {"planInput": plan_input}}
189         logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
190
191         ret = workflow_run(data)
192         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
193         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
194             self.ns_inst_id, ret.get('status')))
195         if ret.get('status') == 1:
196             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
197         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
198
199     def start_activiti_workflow(self, job_id, plan_input, occ_id):
200         # todo occ_id
201         plans = WFPlanModel.objects.filter()
202         if not plans:
203             raise NSLCMException("No plan is found, you should deploy plan first!")
204         data = {
205             "processId": plans[0].process_id,
206             "params": plan_input
207         }
208         ret = activiti.exec_workflow(data)
209         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
210         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
211             self.ns_inst_id, ret.get('status')))
212         if ret.get('status') == 1:
213             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
214         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
215
216     def start_buildin_workflow(self, job_id, plan_input, occ_id):
217         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
218         BuildInWorkflowThread(plan_input, occ_id).start()
219         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
220
221     def start_buildin_grapflow(self, job_id, plan_input, occ_id):
222         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
223         run_ns_instantiate(plan_input, occ_id)
224         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK, occ_id=occ_id)
225
226     @staticmethod
227     def get_vnf_vim_id(vim_id, vnf_vim, vnfdid):
228         new_vim_id = vnf_vim.get(vnfdid) if vnf_vim.get(vnfdid) else vim_id
229         if new_vim_id:
230             return new_vim_id
231         raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
232
233     @staticmethod
234     def get_vnf_vim_info(location_constraints):
235         vnf_vim = {}
236         for location in location_constraints:
237             if "vnfProfileId" in location:
238                 vnfd_id = location["vnfProfileId"]
239                 if len(location['locationConstraints']) == 1:
240                     cloud_owner = location["locationConstraints"]["vimId"].split('_')[0]
241                     cloud_regionid = location["locationConstraints"]["vimId"].split('_')[1]
242                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
243                     vnf_vim[vnfd_id] = vim_id
244                 elif len(location['locationConstraints']) == 2:
245                     cloud_owner = location["locationConstraints"]["cloudOwner"]
246                     cloud_regionid = location["locationConstraints"]["cloudRegionId"]
247                     vim_id = {"cloud_owner": cloud_owner, "cloud_regionid": cloud_regionid}
248                     vnf_vim[vnfd_id] = vim_id
249         return vnf_vim
250
251     @staticmethod
252     def set_vl_vim_id(vim_id, vnf_vim, plan_dict):
253         if "vls" not in plan_dict:
254             logger.debug("No vl is found in nsd.")
255             return
256         vl_vnf = {}
257         for vnf in ignore_case_get(plan_dict, "vnfs"):
258             if "dependencies" in vnf:
259                 for depend in vnf["dependencies"]:
260                     vl_vnf[depend["vl_id"]] = vnf['properties']['id']
261         for vl in plan_dict["vls"]:
262             vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
263             vimid = ignore_case_get(vnf_vim, vnfdid)
264             if not vimid:
265                 vimid = vim_id
266             if "location_info" not in vl["properties"]:
267                 vl["properties"]["location_info"] = {}
268             vl["properties"]["location_info"]["vimid"] = vimid
269
270     @staticmethod
271     def get_model_count(context):
272         data = json.JSONDecoder().decode(context)
273         vls = len(data.get('vls', []))
274         sfcs = len(data.get('fps', []))
275         vnfs = len(data.get('vnfs', []))
276         pnfs = len(data.get('pnfs', []))
277         return {'vlCount': vls, 'sfcCount': sfcs, 'vnfCount': vnfs, 'pnfCount': pnfs}
278
279     def init_pnf_para(self, plan_dict):
280         pnfs_in_input = ignore_case_get(self.req_data, "addpnfData", [])
281         pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs", [])
282         logger.debug("addpnfData ; %s" % pnfs_in_input)
283         logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
284         pnfs = {}
285         for pnf in pnfs_in_input:
286             for pnfd in pnfs_in_nsd:
287                 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
288                     k = pnfd["pnf_id"]
289                     pnf["nsInstances"] = self.ns_inst_id
290                     pnfs[k] = {
291                         "type": "CreatePnf",
292                         "input": {
293                             "content": pnf
294                         }
295                     }
296         return pnfs