# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-def enum(**enums):
- return type('Enum', (), enums)
-# [VNF_STATUS]
+from lcm.pub.utils.jobutil import enum
+
VNF_STATUS = enum(NULL='null', INSTANTIATING="instantiating", INACTIVE='inactive', ACTIVE="active",
FAILED="failed", TERMINATING="terminating", SCALING="scaling", OPERATING="operating",
UPDATING="updating", HEALING="healing")
\ No newline at end of file
from lcm.nf.vnfs.vnf_cancel.term_vnf import TermVnf
from lcm.pub.database.models import NfInstModel, JobStatusModel, VmInstModel, NetworkInstModel, SubNetworkInstModel, \
- PortInstModel
+ PortInstModel, NfvoRegInfoModel
from lcm.pub.utils.jobutil import JobUtil
from lcm.pub.utils.timeutil import now_time
TermVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
self.assert_job_result(self.job_id, 255, "VnfInst(%s) does not exist" % self.nf_inst_id)
+ def test_instantiate_vnf_when_get_nfvo_config_failed(self):
+ NfInstModel.objects.create(nfinstid='1111', mnfinstid='1111', nf_name='2222',
+ package_id='todo', vnfm_inst_id='todo', version='', vendor='',
+ producttype='', netype='', vnfd_model='',
+ instantiationState='VNF_INSTANTIATED', nf_desc='', vnfdid='',
+ vnfSoftwareVersion='', vnfConfigurableProperties='todo',
+ localizationLanguage='EN_US', create_time=now_time())
+ data = {"terminationType": "FORCEFUL",
+ "gracefulTerminationTimeout": 120}
+ self.nf_inst_id = '1111'
+ self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
+ JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
+ TermVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
+ self.assert_job_result(self.job_id, 255, "Nfvo was not registered")
+
def test_terminate_vnf_success(self):
NfInstModel.objects.create(nfinstid='1111', mnfinstid='1111', nf_name='2222',
package_id='todo', vnfm_inst_id='todo', version='', vendor='',
instantiationState='VNF_INSTANTIATED', nf_desc='', vnfdid='',
vnfSoftwareVersion='', vnfConfigurableProperties='todo',
localizationLanguage='EN_US', create_time=now_time())
+ NfvoRegInfoModel.objects.create(nfvoid='nfvo111', vnfminstid='vnfm111', apiurl='http://10.74.44.11',
+ nfvouser='root', nfvopassword='root123')
data = {"terminationType": "FORCEFUL",
"gracefulTerminationTimeout": 120}
self.nf_inst_id = '1111'
import traceback
from threading import Thread
-from lcm.pub.config.enum import VNF_STATUS
+from lcm.nf.vnfs.const import VNF_STATUS
from lcm.pub.database.models import JobStatusModel, NfInstModel, VmInstModel, NetworkInstModel, StorageInstModel, \
- FlavourInstModel, PortInstModel, SubNetworkInstModel
+ FlavourInstModel, PortInstModel, SubNetworkInstModel, NfvoRegInfoModel
from lcm.pub.exceptions import NFLCMException
+from lcm.pub.msapi.nfvolcm import apply_grant_to_nfvo
from lcm.pub.utils.jobutil import JobUtil
-from lcm.pub.utils.timeutil import now_time
from lcm.pub.utils.values import ignore_case_get
logger = logging.getLogger(__name__)
def run(self):
try:
self.term_pre()
- self.grant_resource()
self.query_inst_resource(self.nf_inst_id)
+ # self.grant_resource()
JobUtil.add_job_status(self.job_id, 100, "Terminate Vnf success.")
is_exist = JobStatusModel.objects.filter(jobid=self.job_id).exists()
logger.debug("check_ns_inst_name_exist::is_exist=%s" % is_exist)
raise NFLCMException("No instantiated vnf")
if self.terminationType == 'GRACEFUL' and not self.gracefulTerminationTimeout:
raise NFLCMException("Graceful termination must set timeout")
+ # get nfvo info
+ JobUtil.add_job_status(self.job_id, 5, 'Get nfvo connection info')
+ self.load_nfvo_config()
+
+ NfInstModel.objects.filter(nfinstid=self.nf_inst_id).update(status=VNF_STATUS.TERMINATING)
+
JobUtil.add_job_status(self.job_id, 10, 'Nf terminating pre-check finish')
logger.info("Nf terminating pre-check finish")
- NfInstModel.objects.filter(nfinstid=self.nf_inst_id).update(status=VNF_STATUS.TERMINATING)
def query_inst_resource(self, inst_id):
logger.info('[query_resource begin]:inst_id=%s' % inst_id)
logger.info('[query_vm_resource]:ret_vms=%s' % self.inst_resource['vm'])
def grant_resource(self):
- pass
+ logger.info("nf_cancel_task grant_resource begin")
+ JobUtil.add_job_status(self.job_id, 30, 'nf_cancel_task grant_resource')
+ reg_info = NfvoRegInfoModel.objects.filter(vnfminstid=self.vnfm_inst_id).first()
+ nf_info = NfInstModel.objects.filter(nfinstid=self.vnf_inst_id).first()
+ content_args = {'nfvoInstanceId': reg_info.nfvoid, 'vnfmInstanceId': self.vnfm_inst_id,
+ 'nfInstanceId': self.vnf_inst_id, 'nfDescriptorId': nf_info.vnf_id,
+ 'lifecycleOperation': 'Terminal', 'jobId': '', 'addResource': [],
+ 'removeResource': [], 'placementConstraint': [], 'exVimIdList': [], 'additionalParam': {}}
+
+ content_args['removeResource'] = self.get_grant_data()
+
+ logger.info('content_args=%s' % content_args)
+ rsp = apply_grant_to_nfvo(content_args)
+ logger.info("nf_cancel_task grant_resource rsp: %s" % rsp)
+ if rsp[0] != 0:
+ logger.error("nf_cancel_task grant_resource failed.[%s]" % str(rsp[1]))
+ logger.info("nf_cancel_task grant_resource end")
+
+ def load_nfvo_config(self):
+ logger.info("[NF instantiation]get nfvo connection info start")
+ reg_info = NfvoRegInfoModel.objects.filter(vnfminstid='vnfm111').first()
+ if reg_info:
+ self.vnfm_inst_id = reg_info.vnfminstid
+ self.nfvo_inst_id = reg_info.nfvoid
+ logger.info("[NF instantiation] Registered nfvo id is [%s]" % self.nfvo_inst_id)
+ else:
+ raise NFLCMException("Nfvo was not registered")
+ logger.info("[NF instantiation]get nfvo connection info end")