add NS workflow
[vfc/nfvo/lcm.git] / lcm / ns / biz / ns_instant.py
1 # Copyright 2016-2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import logging
16 import time
17 import traceback
18 import uuid
19 from threading import Thread
20
21 from rest_framework import status
22
23 from lcm.pub.config import config
24 from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
25 from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
26 from lcm.pub.exceptions import NSLCMException
27 from lcm.pub.msapi import activiti
28 from lcm.pub.msapi import sdc_run_catalog
29 from lcm.pub.msapi.catalog import get_process_id
30 from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
31 from lcm.pub.msapi import extsys
32 from lcm.pub.msapi.wso2bpel import workflow_run
33 from lcm.pub.utils.jobutil import JobUtil
34 from lcm.pub.utils.values import ignore_case_get
35 from lcm.workflows import build_in
36 from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
37
38 logger = logging.getLogger(__name__)
39
40
41 class BuildInWorkflowThread(Thread):
42     def __init__(self, plan_input):
43         Thread.__init__(self)
44         self.plan_input = plan_input
45
46     def run(self):
47         build_in.run_ns_instantiate(self.plan_input)
48
49
50 class InstantNSService(object):
51     def __init__(self, ns_inst_id, plan_content):
52         self.ns_inst_id = ns_inst_id
53         self.req_data = plan_content
54
55     def do_biz(self):
56         try:
57             job_id = JobUtil.create_job("NS", "NS_INST", self.ns_inst_id)
58             logger.debug('ns-instant(%s) workflow starting...' % self.ns_inst_id)
59             logger.debug('req_data=%s' % self.req_data)
60             ns_inst = NSInstModel.objects.get(id=self.ns_inst_id)
61
62             input_parameters = []
63             for key, val in self.req_data['additionalParamForNs'].items():
64                 input_parameters.append({"key": key, "value": val})
65
66             vim_id = ''
67             if 'location' in self.req_data['additionalParamForNs']:
68                 vim_id = self.req_data['additionalParamForNs']['location']
69             location_constraints = []
70             if 'locationConstraints' in self.req_data:
71                 location_constraints = self.req_data['locationConstraints']
72
73             JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
74             dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
75             logger.debug('tosca plan dest: %s' % dst_plan)
76             logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
77             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
78
79             params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
80             params_vnf = []
81             plan_dict = json.JSONDecoder().decode(dst_plan)
82             for vnf in ignore_case_get(plan_dict, "vnfs"):
83                 vnfd_id = vnf['properties']['id']
84                 vnfm_type = vnf['properties'].get("nf_type", "undefined")
85                 vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
86                 vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
87
88                 params_vnf.append({
89                     "vnfProfileId": vnf["vnf_id"],
90                     "additionalParam": {
91                         "properties": json.JSONEncoder().encode(vnf['properties']),
92                         "vimId": vimid,
93                         "vnfmInstanceId": vnfm_info["vnfmId"],
94                         "vnfmType": vnfm_type,
95                         "inputs": params_json
96                     }
97                 })
98
99             self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
100             dst_plan = json.JSONEncoder().encode(plan_dict)
101             logger.debug('tosca plan dest add vimid:%s' % dst_plan)
102             NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
103
104             pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
105
106             vnf_params_json = json.JSONEncoder().encode(params_vnf)
107             plan_input = {
108                 'jobId': job_id,
109                 'nsInstanceId': self.ns_inst_id,
110                 'object_context': dst_plan,
111                 'object_additionalParamForNs': params_json,
112                 'object_additionalParamForVnf': vnf_params_json,
113                 'object_additionalParamForPnf': pnf_params_json
114             }
115             plan_input.update(**self.get_model_count(dst_plan))
116             plan_input["sdnControllerId"] = ignore_case_get(
117                 self.req_data['additionalParamForNs'], "sdncontroller")
118
119             ServiceBaseInfoModel(service_id=self.ns_inst_id,
120                                  service_name=ns_inst.name,
121                                  service_type='NFVO',
122                                  description=ns_inst.description,
123                                  active_status='--',
124                                  status=ns_inst.status,
125                                  creator='--',
126                                  create_time=int(time.time() * 1000)).save()
127
128             if config.WORKFLOW_OPTION == "wso2":
129                 service_tpl = get_servicetemplate(ns_inst.nsd_id)
130                 DefPkgMappingModel(service_id=self.ns_inst_id,
131                                    service_def_id=service_tpl['csarId'],
132                                    template_name=service_tpl['templateName'],
133                                    template_id=service_tpl['serviceTemplateId']).save()
134
135                 for key, val in self.req_data['additionalParamForNs'].items():
136                     InputParamMappingModel(service_id=self.ns_inst_id,
137                                            input_key=key,
138                                            input_value=val).save()
139
140                 for vnffg in ignore_case_get(plan_dict, "vnffgs"):
141                     VNFFGInstModel(vnffgdid=vnffg["vnffg_id"],
142                                    vnffginstid=str(uuid.uuid4()),
143                                    nsinstid=self.ns_inst_id,
144                                    endpointnumber=0).save()
145             else:
146                 # TODO:
147                 pass
148             logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
149             if config.WORKFLOW_OPTION == "wso2":
150                 return self.start_wso2_workflow(job_id, ns_inst, plan_input)
151             elif config.WORKFLOW_OPTION == "activiti":
152                 return self.start_activiti_workflow()
153             elif config.WORKFLOW_OPTION == "grapflow":
154                 return self.start_buildin_grapflow(job_id, plan_input)
155             else:
156                 return self.start_buildin_workflow(job_id, plan_input)
157
158         except Exception as e:
159             logger.error(traceback.format_exc())
160             logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
161             JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
162             return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
163
164     def start_wso2_workflow(self, job_id, ns_inst, plan_input):
165         servicetemplate_id = get_servicetemplate_id(ns_inst.nsd_id)
166         process_id = get_process_id('init', servicetemplate_id)
167         data = {"processId": process_id, "params": {"planInput": plan_input}}
168         logger.debug('ns-instant(%s) workflow data:%s' % (self.ns_inst_id, data))
169
170         ret = workflow_run(data)
171         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
172         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) wso2 workflow started: %s' % (
173             self.ns_inst_id, ret.get('status')))
174         if ret.get('status') == 1:
175             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
176         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
177
178     def start_activiti_workflow(self, job_id, plan_input):
179         plans = WFPlanModel.objects.filter()
180         if not plans:
181             raise NSLCMException("No plan is found, you should deploy plan first!")
182         data = {
183             "processId": plans[0].process_id,
184             "params": plan_input
185         }
186         ret = activiti.exec_workflow(data)
187         logger.info("ns-instant(%s) workflow result:%s" % (self.ns_inst_id, ret))
188         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) activiti workflow started: %s' % (
189             self.ns_inst_id, ret.get('status')))
190         if ret.get('status') == 1:
191             return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
192         return dict(data={'error': ret['message']}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
193
194     def start_buildin_workflow(self, job_id, plan_input):
195         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin workflow started.' % self.ns_inst_id)
196         BuildInWorkflowThread(plan_input).start()
197         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
198
199     def start_buildin_grapflow(self, job_id, plan_input):
200         JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
201         run_ns_instantiate(plan_input)
202         return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
203
204     @staticmethod
205     def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
206         for location in location_constraints:
207             if "vnfProfileId" in location and vnfdid == location["vnfProfileId"]:
208                 return location["locationConstraints"]["vimId"]
209         if vim_id:
210             return vim_id
211         raise NSLCMException("No Vim info is found for vnf(%s)." % vnfdid)
212
213     @staticmethod
214     def set_vl_vim_id(vim_id, location_constraints, plan_dict):
215         if "vls" not in plan_dict:
216             logger.debug("No vl is found in nsd.")
217             return
218         vl_vnf = {}
219         for vnf in ignore_case_get(plan_dict, "vnfs"):
220             if "dependencies" in vnf:
221                 for depend in vnf["dependencies"]:
222                     vl_vnf[depend["vl_id"]] = vnf['properties']['id']
223         vnf_vim = {}
224         for location in location_constraints:
225             if "vnfProfileId" in location:
226                 vnfd_id = location["vnfProfileId"]
227                 vnf_vim[vnfd_id] = location["locationConstraints"]["vimId"]
228         for vl in plan_dict["vls"]:
229             vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
230             vimid = ignore_case_get(vnf_vim, vnfdid)
231             if not vimid:
232                 vimid = vim_id
233             if not vimid:
234                 raise NSLCMException("No Vim info for vl(%s) of vnf(%s)." % (vl["vl_id"], vnfdid))
235             if "location_info" not in vl["properties"]:
236                 vl["properties"]["location_info"] = {}
237             vl["properties"]["location_info"]["vimid"] = vimid
238
239     @staticmethod
240     def get_model_count(context):
241         data = json.JSONDecoder().decode(context)
242         vls = len(data.get('vls', []))
243         sfcs = len(data.get('fps', []))
244         vnfs = len(data.get('vnfs', []))
245         return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs)}
246
247     def init_pnf_para(self, plan_dict):
248         pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
249         pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
250         logger.debug("addpnfData ; %s" % pnfs_in_input)
251         logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
252         pnfs = {}
253         for pnf in pnfs_in_input:
254             for pnfd in pnfs_in_nsd:
255                 if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
256                     k = pnfd["pnf_id"]
257                     pnf["nsInstances"] = self.ns_inst_id
258                     # todo pnf["pnfdInfoId"]
259                     pnfs[k] = {
260                         "type": "CreatePnf",
261                         "input": {
262                             "content": pnf
263                         }
264                     }
265         return pnfs