1 # Copyright (C) 2018 Verizon. All Rights Reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from threading import Thread
20 from lcm.nf.biz import common
21 from lcm.nf.biz.grant_vnf import grant_resource
22 from lcm.nf.const import VNF_STATUS, GRANT_TYPE, CHANGE_TYPE
23 from lcm.nf.const import RESOURCE_MAP, OPERATION_STATE_TYPE
24 from lcm.nf.const import INSTANTIATION_STATE
25 from lcm.pub.database.models import NfInstModel
26 from lcm.pub.database.models import VNFCInstModel, PortInstModel
27 from lcm.pub.database.models import VmInstModel
28 from lcm.pub.exceptions import NFLCMException
29 from lcm.pub.utils.notificationsutil import NotificationsUtil, prepare_notification
30 from lcm.pub.utils.values import ignore_case_get
31 from lcm.pub.utils.jobutil import JobUtil
32 from lcm.pub.utils.timeutil import now_time
33 from lcm.pub.vimapi import adaptor
35 logger = logging.getLogger(__name__)
40 class ScaleVnf(Thread):
41 def __init__(self, data, nf_inst_id, job_id):
42 super(ScaleVnf, self).__init__()
44 self.nf_inst_id = nf_inst_id
46 self.vnf_insts = NfInstModel.objects.filter(nfinstid=self.nf_inst_id)
51 JobUtil.add_job_status(self.job_id,
53 "Start to apply grant.")
55 JobUtil.add_job_status(self.job_id,
57 "Start to scale Vnf.")
59 JobUtil.add_job_status(self.job_id,
62 self.vnf_insts.update(status=INSTANTIATION_STATE.INSTANTIATED,
63 lastuptime=now_time())
64 except NFLCMException as e:
65 logger.error(e.message)
66 self.vnf_scale_failed_handle(e.message)
67 except Exception as e:
68 logger.error(e.message)
69 logger.error(traceback.format_exc())
70 self.vnf_scale_failed_handle(e.message)
73 self.scale_type = self.data.get("type")
74 self.aspect_id = self.data.get("aspectId")
75 self.number_of_steps = int(self.data.get("numberOfSteps", DEFAULT_STEPS))
76 self.additional_params = self.data.get("additionalParams", {})
77 self.is_scale_in = (self.scale_type == GRANT_TYPE.SCALE_IN)
78 self.vnfd_info = json.loads(self.vnf_insts[0].vnfd_model)
79 self.step_delta = self.get_scale_step_delta()
80 self.target_vdu, self.step_inst_num = self.get_vdu_scale_aspect_deltas()
81 self.scale_inst_num = self.number_of_steps * self.step_inst_num
82 self.min_instance_num, self.max_instance_num = self.get_instance_range()
83 self.check_if_can_scale()
84 self.scale_out_resource = {}
86 def apply_grant(self):
87 logger.debug("Start scale apply grant")
88 vdus = ignore_case_get(self.vnfd_info, "vdus")
89 scale_vdus = [vdu for vdu in vdus if vdu["vdu_id"] == self.target_vdu]
90 scale_vdus = scale_vdus * self.scale_inst_num
91 grant_result = grant_resource(data=self.data,
92 nf_inst_id=self.nf_inst_id,
94 grant_type=self.scale_type,
96 logger.debug("Scale Grant result: %s", grant_result)
97 self.set_location(grant_result)
99 def do_operation(self):
100 logger.debug("Start %s VNF resource", self.scale_type)
101 logger.debug('VnfdInfo = %s' % self.vnfd_info)
104 self.affected_vnfcs = []
105 self.scale_in_resource = self.get_scale_in_resource(self.affected_vnfcs)
106 adaptor.delete_vim_res(self.scale_in_resource, self.do_notify_del_vim_res)
108 self.scale_out_resource = {
116 self.vnfd_info["volume_storages"] = []
117 self.vnfd_info["vls"] = []
118 self.vnfd_info["cps"] = self.vnfd_info["cps"] * self.scale_inst_num
119 for cp in self.vnfd_info["cps"]:
120 # TODO: how to set name for scale_out cp
121 cp["properties"]["name"] = cp["cp_id"] + str(uuid.uuid4())
122 cp_inst = PortInstModel.objects.filter(name__startswith=cp["cp_id"]).first()
124 cp["networkId"] = cp_inst.networkid
125 cp["subnetId"] = cp_inst.subnetworkid
127 raise NFLCMException("CP(%s) does not exist" % cp["cp_id"])
128 self.vnfd_info["vdus"] = self.vnfd_info["vdus"] * self.scale_inst_num
129 for vdu in self.vnfd_info["vdus"]:
130 # TODO: how to set name for scale_out vdu
131 vdu["properties"]["name"] = vdu["properties"]["name"] + str(uuid.uuid4())
133 vim_cache = json.loads(self.vnf_insts[0].vimInfo)
134 res_cache = json.loads(self.vnf_insts[0].resInfo)
135 adaptor.create_vim_res(self.vnfd_info,
136 self.do_notify_create_vim_res,
139 self.vnf_insts.update(vimInfo=json.dumps(vim_cache),
140 resInfo=json.dumps(res_cache))
141 logger.debug("%s VNF resource finish", self.scale_type)
143 def send_notification(self):
144 data = prepare_notification(nfinstid=self.nf_inst_id,
146 operation=self.op_type,
147 operation_state=OPERATION_STATE_TYPE.COMPLETED)
149 # TODO: need set changedExtConnectivity for data
151 data["affectedVnfcs"] = self.affected_vnfcs
153 for vm in self.scale_out_resource["vm"]:
154 self.set_affected_vnfcs(data["affectedVnfcs"], vm["res_id"])
156 logger.debug('Notify request data = %s' % data)
157 NotificationsUtil().send_notification(data)
159 def rollback_operation(self):
161 # SCALE_IN operaion does not support rollback
163 adaptor.delete_vim_res(self.scale_out_resource, self.do_notify_del_vim_res)
165 def set_location(self, grant_result):
166 vim_connections = ignore_case_get(grant_result, "vimConnections")
167 access_info = ignore_case_get(vim_connections[0], "accessInfo")
168 tenant = ignore_case_get(access_info, "tenant")
169 vimid = ignore_case_get(vim_connections[0], "vimId")
171 for resource_type in ['vdus', 'vls', 'cps', 'volume_storages']:
172 for resource in ignore_case_get(self.vnfd_info, resource_type):
173 if "location_info" not in resource["properties"]:
174 resource["properties"]["location_info"] = {}
175 resource["properties"]["location_info"]["vimid"] = vimid
176 resource["properties"]["location_info"]["tenant"] = tenant
178 def do_notify_create_vim_res(self, res_type, ret):
179 logger.debug('Scaling out [%s] resource' % res_type)
180 resource_save_method = getattr(common, res_type + '_save')
181 resource_save_method(self.job_id, self.nf_inst_id, ret)
182 self.scale_out_resource[res_type].append(self.gen_del_resource(ret))
184 def do_notify_del_vim_res(self, res_type, res_id):
185 logger.debug('Scaling in [%s] resource, resourceid [%s]', res_type, res_id)
186 resource_type = RESOURCE_MAP.keys()[RESOURCE_MAP.values().index(res_type)]
187 resource_table = globals().get(resource_type + 'InstModel')
188 resource_table.objects.filter(instid=self.nf_inst_id, resourceid=res_id).delete()
190 VNFCInstModel.objects.filter(instid=self.nf_inst_id, vmid=res_id).delete()
192 def get_scale_in_resource(self, affected_vnfcs):
193 scale_in_resource = {
201 scale_in_vms = VmInstModel.objects.filter(instid=self.nf_inst_id)
202 vms_count = scale_in_vms.count()
203 for index in range(self.scale_inst_num):
204 vm_index = vms_count - index - 1
205 scale_in_resource["vm"].append(self.gen_del_resource(scale_in_vms[vm_index]))
206 self.set_affected_vnfcs(affected_vnfcs, scale_in_vms[vm_index].resourceid)
207 return scale_in_resource
209 def gen_del_resource(self, res):
210 is_dict = isinstance(res, dict)
212 "vim_id": res["vimId"] if is_dict else res.vimid,
213 "tenant_id": res["tenantId"] if is_dict else res.tenant,
214 "res_id": res["id"] if is_dict else res.resourceid,
215 "is_predefined": res["returnCode"] if is_dict else res.is_predefined
218 def get_scale_step_delta(self):
219 for policy in self.vnfd_info.get("policies", []):
220 if policy.get("type") != "tosca.policies.nfv.ScalingAspects":
222 aspects = policy["properties"]["aspects"]
223 if self.aspect_id in aspects:
224 return aspects.get(self.aspect_id).get("step_deltas")[0]
225 raise NFLCMException("Aspect(%s) does not exist" % self.aspect_id)
227 def get_vdu_scale_aspect_deltas(self):
228 for policy in self.vnfd_info.get("policies", []):
229 if policy.get("type") != "tosca.policies.nfv.VduScalingAspectDeltas":
231 target = policy.get("targets")[0]
232 deltas = policy["properties"]["deltas"]
233 if self.step_delta in deltas:
234 num = int(deltas.get(self.step_delta).get("number_of_instances"))
236 raise NFLCMException("Aspect step delta(%s) does not exist" % self.step_delta)
238 def get_instance_range(self):
239 for vdu in self.vnfd_info["vdus"]:
240 if vdu["vdu_id"] == self.target_vdu:
241 vdu_profile = vdu["properties"]["vdu_profile"]
242 min_inst_num = int(vdu_profile["min_number_of_instances"])
243 max_inst_num = int(vdu_profile["max_number_of_instances"])
244 return min_inst_num, max_inst_num
245 raise NFLCMException("VDU(%s) does not exist" % self.target_vdu)
247 def check_if_can_scale(self):
248 cur_inst_num = VNFCInstModel.objects.filter(instid=self.nf_inst_id).count()
250 if cur_inst_num - self.scale_inst_num < self.min_instance_num:
251 msg = "VNF(%s) cannot be scaled: less than min instance."
252 raise NFLCMException(msg % self.nf_inst_id)
254 if cur_inst_num + self.scale_inst_num > self.max_instance_num:
255 msg = "VNF(%s) cannot be scaled: max instance exceeded."
256 raise NFLCMException(msg % self.nf_inst_id)
258 def set_affected_vnfcs(self, affected_vnfcs, vm_id):
259 chgtype = CHANGE_TYPE.REMOVED if self.is_scale_in else CHANGE_TYPE.ADDED
260 vnfcs = VNFCInstModel.objects.filter(instid=self.nf_inst_id, vmid=vm_id)
261 vm = VmInstModel.objects.filter(instid=self.nf_inst_id, resourceid=vm_id)
265 'vimConnectionId': vm[0].vimid,
266 'resourceId': vm[0].resourceid,
267 'vimLevelResourceType': 'vm'
270 affected_vnfcs.append({
271 'id': vnfcs[0].vnfcinstanceid,
272 'vduId': vnfcs[0].vduid,
273 'changeType': chgtype,
274 'computeResource': vm_resource
277 def vnf_scale_failed_handle(self, error_msg):
278 logger.error('VNF scaling failed, detail message: %s', error_msg)
279 self.vnf_insts.update(status=VNF_STATUS.FAILED,
280 lastuptime=now_time())
281 JobUtil.add_job_status(self.job_id, 255, error_msg)