1 # Copyright 2017 ZTE Corporation.
\r
3 # Licensed under the Apache License, Version 2.0 (the "License");
\r
4 # you may not use this file except in compliance with the License.
\r
5 # You may obtain a copy of the License at
\r
7 # http://www.apache.org/licenses/LICENSE-2.0
\r
9 # Unless required by applicable law or agreed to in writing, software
\r
10 # distributed under the License is distributed on an "AS IS" BASIS,
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 # See the License for the specific language governing permissions and
\r
13 # limitations under the License.
\r
20 from django.test import TestCase, Client
\r
21 from rest_framework import status
\r
23 from lcm.nf.biz.terminate_vnf import TerminateVnf
\r
24 from lcm.pub.database.models import NfInstModel, JobStatusModel, VmInstModel, NetworkInstModel, SubNetworkInstModel, \
\r
25 PortInstModel, FlavourInstModel, StorageInstModel
\r
26 from lcm.pub.utils import restcall
\r
27 from lcm.pub.utils.jobutil import JobUtil
\r
28 from lcm.pub.utils.timeutil import now_time
\r
29 from lcm.pub.vimapi import api
\r
32 class TestNFTerminate(TestCase):
\r
34 self.client = Client()
\r
35 StorageInstModel.objects.create(storageid="1",
\r
41 NetworkInstModel.objects.create(networkid='1',
\r
44 name='pnet_network',
\r
49 SubNetworkInstModel.objects.create(subnetworkid='1',
\r
58 PortInstModel.objects.create(portid='1',
\r
68 FlavourInstModel.objects.create(flavourid="1",
\r
73 VmInstModel.objects.create(vmid="1",
\r
83 VmInstModel.objects.all().delete()
\r
84 NetworkInstModel.objects.all().delete()
\r
85 SubNetworkInstModel.objects.all().delete()
\r
86 PortInstModel.objects.all().delete()
\r
88 def assert_job_result(self, job_id, job_progress, job_detail):
\r
89 jobs = JobStatusModel.objects.filter(jobid=job_id,
\r
90 progress=job_progress,
\r
92 self.assertEqual(1, len(jobs))
\r
94 @mock.patch.object(TerminateVnf, 'run')
\r
95 def test_terminate_vnf(self, mock_run):
\r
97 "terminationType": "GRACEFUL",
\r
98 "gracefulTerminationTimeout": 120
\r
100 NfInstModel(nfinstid='12', nf_name='VNF1').save()
\r
101 mock_run.re.return_value = None
\r
102 response = self.client.post("/api/vnflcm/v1/vnf_instances/12/terminate", data=req_data, format='json')
\r
103 self.failUnlessEqual(status.HTTP_202_ACCEPTED, response.status_code)
\r
105 def test_terminate_vnf_when_inst_id_not_exist(self):
\r
107 "terminationType": "GRACEFUL",
\r
108 "gracefulTerminationTimeout": 120
\r
110 self.nf_inst_id = str(uuid.uuid4())
\r
111 self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
\r
112 JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
\r
113 TerminateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
\r
114 self.assert_job_result(self.job_id, 100, "Terminate Vnf success.")
\r
116 @mock.patch.object(restcall, 'call_req')
\r
117 @mock.patch.object(api, 'call')
\r
118 def test_terminate_vnf_success(self, mock_call, mock_call_req):
\r
119 NfInstModel.objects.create(nfinstid='1111',
\r
127 status='VNF_INSTANTIATED',
\r
130 vnfSoftwareVersion='',
\r
131 vnfConfigurableProperties='todo',
\r
132 localizationLanguage='EN_US',
\r
133 create_time=now_time())
\r
134 t1_apply_grant_result = [0, json.JSONEncoder().encode(
\r
137 "vnfInstanceId": "1",
\r
138 "vnfLcmOpOccId": "2",
\r
139 "vimConnections": [
\r
146 t2_lcm_notify_result = [0, json.JSONEncoder().encode(''), '200']
\r
147 t3_delete_flavor = [0, json.JSONEncoder().encode({"vim_id": "vimid_1"}), '200']
\r
148 mock_call_req.side_effect = [t1_apply_grant_result, t2_lcm_notify_result, t3_delete_flavor]
\r
149 mock_call.return_value = None
\r
151 "terminationType": "FORCEFUL",
\r
152 "gracefulTerminationTimeout": 120
\r
154 self.nf_inst_id = '1111'
\r
155 self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
\r
156 JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
\r
157 TerminateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
\r
158 self.assert_job_result(self.job_id, 100, "Terminate Vnf success.")
\r