1 # Copyright 2017 ZTE Corporation.
\r
3 # Licensed under the Apache License, Version 2.0 (the "License");
\r
4 # you may not use this file except in compliance with the License.
\r
5 # You may obtain a copy of the License at
\r
7 # http://www.apache.org/licenses/LICENSE-2.0
\r
9 # Unless required by applicable law or agreed to in writing, software
\r
10 # distributed under the License is distributed on an "AS IS" BASIS,
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
12 # See the License for the specific language governing permissions and
\r
13 # limitations under the License.
\r
19 from django.test import TestCase
\r
20 from rest_framework import status
\r
21 from rest_framework.test import APIClient
\r
23 from lcm.nf.biz.instantiate_vnf import InstantiateVnf
\r
24 from lcm.nf.const import c1_data_get_tenant_id, c4_data_create_network, c2_data_create_volume, \
\r
25 c5_data_create_subnet, c3_data_get_volume, c6_data_create_port, c7_data_create_flavor, c8_data_list_image, \
\r
26 c9_data_create_vm, c10_data_get_vm, inst_req_data, vnfpackage_info
\r
27 from lcm.pub.database.models import NfInstModel, JobStatusModel, SubscriptionModel
\r
28 from lcm.pub.utils import restcall
\r
29 from lcm.pub.utils.jobutil import JobUtil
\r
30 from lcm.pub.utils.timeutil import now_time
\r
31 from lcm.pub.utils.notificationsutil import NotificationsUtil
\r
32 from lcm.pub.vimapi import api
\r
35 class TestNFInstantiate(TestCase):
\r
37 self.client = APIClient()
\r
38 self.grant_result = {
\r
44 "tenant": 'chinamobile'
\r
48 "vnfId": "413aa1fe-b4d1-11e8-8268-dff5aab95c63",
\r
51 "computeResourceFlavours": [
\r
53 "resourceProviderId": "vgw",
\r
54 "vimFlavourId": "yui",
\r
55 "vimConnectionId": ""
\r
64 def assert_job_result(self, job_id, job_progress, job_detail):
\r
65 jobs = JobStatusModel.objects.filter(jobid=job_id,
\r
66 progress=job_progress,
\r
68 self.assertEqual(1, len(jobs))
\r
70 @mock.patch.object(InstantiateVnf, 'run')
\r
71 def test_instantiate_vnf(self, mock_run):
\r
72 NfInstModel(nfinstid='12', nf_name='VNF1', status='UN_INSTANTIATED').save()
\r
73 mock_run.re.return_value = None
\r
74 response = self.client.post('/api/vnflcm/v1/vnf_instances/12/instantiate', data=inst_req_data, format='json')
\r
75 self.failUnlessEqual(status.HTTP_202_ACCEPTED, response.status_code)
\r
77 def test_instantiate_vnf_when_inst_id_not_exist(self):
\r
78 self.nf_inst_id = str(uuid.uuid4())
\r
79 self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
\r
80 JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')
\r
81 jobs = JobStatusModel.objects.filter(jobid=self.job_id,
\r
83 descp='INST_VNF_READY')
\r
84 self.assertEqual(1, len(jobs))
\r
85 data = inst_req_data
\r
86 InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
\r
87 self.assert_job_result(self.job_id, 255, 'VNF nf_inst_id is not exist.')
\r
89 def test_instantiate_vnf_when_already_instantiated(self):
\r
90 NfInstModel.objects.create(nfinstid='1111',
\r
97 status='INSTANTIATED',
\r
98 nf_desc='vFW in Nanjing TIC Edge',
\r
100 create_time=now_time())
\r
101 self.nf_inst_id = '1111'
\r
102 self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
\r
103 JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')
\r
104 jobs = JobStatusModel.objects.filter(jobid=self.job_id,
\r
106 descp='INST_VNF_READY')
\r
107 self.assertEqual(1, len(jobs))
\r
108 data = inst_req_data
\r
109 InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
\r
110 self.assert_job_result(self.job_id, 255, 'VNF instantiationState is not NOT_INSTANTIATED.')
\r
112 @mock.patch.object(restcall, 'call_req')
\r
113 def test_instantiate_vnf_when_get_packageinfo_by_csarid_failed(self, mock_call_req):
\r
114 NfInstModel.objects.create(nfinstid='1111',
\r
121 status='NOT_INSTANTIATED',
\r
122 nf_desc='vFW in Nanjing TIC Edge',
\r
124 create_time=now_time())
\r
125 r1_get_vnfpackage_by_vnfdid = [1, json.JSONEncoder().encode(vnfpackage_info), '200']
\r
126 mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid]
\r
127 self.nf_inst_id = '1111'
\r
128 self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
\r
129 JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')
\r
130 data = inst_req_data
\r
131 InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
\r
132 self.assert_job_result(self.job_id, 255, 'Failed to query vnf CSAR(111) from catalog.')
\r
134 @mock.patch.object(restcall, 'call_req')
\r
135 def test_instantiate_vnf_when_applay_grant_failed(self, mock_call_req):
\r
136 NfInstModel.objects.create(nfinstid='1111',
\r
143 status='NOT_INSTANTIATED',
\r
144 nf_desc='vFW in Nanjing TIC Edge',
\r
146 create_time=now_time())
\r
147 r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']
\r
148 r2_apply_grant_result = [1, json.JSONEncoder().encode(self.grant_result), '200']
\r
149 mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result]
\r
150 self.nf_inst_id = '1111'
\r
151 self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
\r
152 JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')
\r
153 data = inst_req_data
\r
154 InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
\r
155 self.assert_job_result(self.job_id, 255, 'Nf instancing apply grant exception')
\r
157 @mock.patch.object(restcall, 'call_req')
\r
158 @mock.patch.object(api, 'call')
\r
159 def test_instantiate_vnf_when_unexpected_exception(self, mock_call, mock_call_req):
\r
160 NfInstModel.objects.create(nfinstid='1111',
\r
167 status='NOT_INSTANTIATED',
\r
168 nf_desc='vFW in Nanjing TIC Edge',
\r
170 create_time=now_time())
\r
171 r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']
\r
172 r2_apply_grant_result = [0, json.JSONEncoder().encode(self.grant_result), '200']
\r
173 mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result]
\r
174 mock_call.side_effect = [c1_data_get_tenant_id, c2_data_create_volume, c3_data_get_volume]
\r
175 self.nf_inst_id = '1111'
\r
176 self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
\r
177 JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')
\r
178 data = inst_req_data
\r
179 InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
\r
180 self.assert_job_result(self.job_id, 255, 'unexpected exception')
\r
182 @mock.patch.object(restcall, 'call_req')
\r
183 @mock.patch.object(api, 'call')
\r
184 @mock.patch.object(NotificationsUtil, 'post_notification')
\r
185 def test_instantiate_vnf_success(self, mock_post_notification, mock_call, mock_call_req):
\r
186 NfInstModel.objects.create(nfinstid='1111',
\r
193 status='NOT_INSTANTIATED',
\r
194 nf_desc='vFW in Nanjing TIC Edge',
\r
196 create_time=now_time())
\r
197 SubscriptionModel.objects.create(
\r
198 subscription_id=str(uuid.uuid4()),
\r
199 callback_uri='api/gvnfmdriver/v1/vnfs/lifecyclechangesnotification',
\r
200 auth_info=json.JSONEncoder().encode({
\r
201 'authType': ['BASIC'],
\r
203 'userName': 'username',
\r
204 'password': 'password'
\r
207 notification_types=str([
\r
208 'VnfLcmOperationOccurrenceNotification',
\r
209 'VnfIdentifierCreationNotification',
\r
210 'VnfIdentifierDeletionNotification'
\r
212 operation_types=str(['INSTANTIATE']),
\r
213 operation_states=str(['COMPLETED']),
\r
214 vnf_instance_filter=json.JSONEncoder().encode({
\r
215 'vnfdIds': ['111'],
\r
216 'vnfProductsFromProviders': [],
\r
217 'vnfInstanceIds': ['1111'],
\r
218 'vnfInstanceNames': [],
\r
221 r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']
\r
222 r2_apply_grant_result = [0, json.JSONEncoder().encode(self.grant_result), '200']
\r
223 r3_all_aai_result = [1, json.JSONEncoder().encode(''), '404']
\r
224 r4_lcm_notify_result = [0, json.JSONEncoder().encode(''), '200']
\r
225 mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result, r3_all_aai_result, r4_lcm_notify_result]
\r
226 mock_call.side_effect = [c1_data_get_tenant_id,
\r
227 c2_data_create_volume, c3_data_get_volume,
\r
228 c4_data_create_network,
\r
229 c5_data_create_subnet,
\r
230 c6_data_create_port,
\r
231 c7_data_create_flavor,
\r
232 c8_data_list_image, c9_data_create_vm, c10_data_get_vm]
\r
233 mock_post_notification.side_effect = None
\r
234 self.nf_inst_id = '1111'
\r
235 self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
\r
236 JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')
\r
237 data = inst_req_data
\r
238 InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
\r
239 # self.assert_job_result(self.job_id, 100, 'Instantiate Vnf success.')
\r