Refactor codes for inst vnf ut
[vfc/gvnfm/vnflcm.git] / lcm / lcm / nf / tests / test_instantiate_vnf.py
1 # Copyright 2017 ZTE Corporation.\r
2 #\r
3 # Licensed under the Apache License, Version 2.0 (the "License");\r
4 # you may not use this file except in compliance with the License.\r
5 # You may obtain a copy of the License at\r
6 #\r
7 #         http://www.apache.org/licenses/LICENSE-2.0\r
8 #\r
9 # Unless required by applicable law or agreed to in writing, software\r
10 # distributed under the License is distributed on an "AS IS" BASIS,\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12 # See the License for the specific language governing permissions and\r
13 # limitations under the License.\r
14 \r
15 import json\r
16 import uuid\r
17 import mock\r
18 \r
19 from django.test import TestCase\r
20 from rest_framework import status\r
21 from rest_framework.test import APIClient\r
22 \r
23 from .const import c1_data_get_tenant_id\r
24 from .const import c4_data_create_network\r
25 from .const import c2_data_create_volume\r
26 from .const import c5_data_create_subnet\r
27 from .const import c3_data_get_volume\r
28 from .const import c6_data_create_port\r
29 from .const import c7_data_create_flavor\r
30 from .const import c8_data_list_image\r
31 from .const import c9_data_create_vm\r
32 from .const import c10_data_get_vm\r
33 from .const import inst_req_data\r
34 from .const import vnfpackage_info\r
35 from .const import instantiate_grant_result\r
36 \r
37 from lcm.pub.database.models import NfInstModel\r
38 from lcm.pub.database.models import JobStatusModel\r
39 from lcm.pub.database.models import SubscriptionModel\r
40 from lcm.pub.utils import restcall\r
41 from lcm.pub.utils.jobutil import JobUtil\r
42 from lcm.pub.utils.timeutil import now_time\r
43 from lcm.pub.utils.notificationsutil import NotificationsUtil\r
44 from lcm.pub.vimapi import api\r
45 \r
46 from lcm.nf.biz.instantiate_vnf import InstantiateVnf\r
47 \r
48 \r
49 class TestNFInstantiate(TestCase):\r
50     def setUp(self):\r
51         self.client = APIClient()\r
52         self.grant_result = instantiate_grant_result\r
53 \r
54     def tearDown(self):\r
55         NfInstModel.objects.all().delete()\r
56 \r
57     def assert_job_result(self, job_id, job_progress, job_detail):\r
58         jobs = JobStatusModel.objects.filter(\r
59             jobid=job_id,\r
60             progress=job_progress,\r
61             descp=job_detail\r
62         )\r
63         self.assertEqual(1, len(jobs))\r
64 \r
65     @mock.patch.object(InstantiateVnf, 'run')\r
66     def test_instantiate_vnf(self, mock_run):\r
67         NfInstModel(\r
68             nfinstid='12',\r
69             nf_name='VNF1',\r
70             status='NOT_INSTANTIATED'\r
71         ).save()\r
72         mock_run.re.return_value = None\r
73         response = self.client.post(\r
74             '/api/vnflcm/v1/vnf_instances/12/instantiate',\r
75             data=inst_req_data,\r
76             format='json'\r
77         )\r
78         self.failUnlessEqual(\r
79             status.HTTP_202_ACCEPTED,\r
80             response.status_code\r
81         )\r
82 \r
83     @mock.patch.object(InstantiateVnf, 'run')\r
84     def test_instantiate_vnf_notfound(self, mock_run):\r
85         mock_run.re.return_value = None\r
86         response = self.client.post(\r
87             '/api/vnflcm/v1/vnf_instances/3421/instantiate',\r
88             data=inst_req_data,\r
89             format='json'\r
90         )\r
91         self.failUnlessEqual(\r
92             status.HTTP_404_NOT_FOUND,\r
93             response.status_code,\r
94             response.content\r
95         )\r
96 \r
97     @mock.patch.object(InstantiateVnf, 'run')\r
98     def test_instantiate_vnf_conflict(self, mock_run):\r
99         NfInstModel(\r
100             nfinstid='1123',\r
101             nf_name='VNF1',\r
102             status='INSTANTIATED'\r
103         ).save()\r
104         mock_run.re.return_value = None\r
105         response = self.client.post(\r
106             '/api/vnflcm/v1/vnf_instances/1123/instantiate',\r
107             data=inst_req_data,\r
108             format='json'\r
109         )\r
110         self.failUnlessEqual(\r
111             status.HTTP_409_CONFLICT,\r
112             response.status_code,\r
113             response.content\r
114         )\r
115 \r
116     def test_instantiate_vnf_when_inst_id_not_exist(self):\r
117         self.nf_inst_id = str(uuid.uuid4())\r
118         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
119         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
120         jobs = JobStatusModel.objects.filter(\r
121             jobid=self.job_id,\r
122             progress=0,\r
123             descp='INST_VNF_READY'\r
124         )\r
125         self.assertEqual(1, len(jobs))\r
126         data = inst_req_data\r
127         InstantiateVnf(\r
128             data,\r
129             nf_inst_id=self.nf_inst_id,\r
130             job_id=self.job_id\r
131         ).run()\r
132         self.assert_job_result(\r
133             self.job_id,\r
134             255,\r
135             'VNF nf_inst_id is not exist.'\r
136         )\r
137 \r
138     def test_instantiate_vnf_when_already_instantiated(self):\r
139         NfInstModel.objects.create(\r
140             nfinstid='1111',\r
141             nf_name='vFW_01',\r
142             package_id='222',\r
143             version='',\r
144             vendor='',\r
145             netype='',\r
146             vnfd_model='',\r
147             status='INSTANTIATED',\r
148             nf_desc='vFW in Nanjing TIC Edge',\r
149             vnfdid='111',\r
150             create_time=now_time()\r
151         )\r
152         self.nf_inst_id = '1111'\r
153         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
154         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
155         jobs = JobStatusModel.objects.filter(\r
156             jobid=self.job_id,\r
157             progress=0,\r
158             descp='INST_VNF_READY'\r
159         )\r
160         self.assertEqual(1, len(jobs))\r
161         data = inst_req_data\r
162         InstantiateVnf(\r
163             data,\r
164             nf_inst_id=self.nf_inst_id,\r
165             job_id=self.job_id\r
166         ).run()\r
167         self.assert_job_result(\r
168             self.job_id,\r
169             255,\r
170             'VNF instantiationState is not NOT_INSTANTIATED.'\r
171         )\r
172 \r
173     @mock.patch.object(restcall, 'call_req')\r
174     def test_instantiate_vnf_when_get_packageinfo_by_csarid_failed(self, mock_call_req):\r
175         NfInstModel.objects.create(\r
176             nfinstid='1111',\r
177             nf_name='vFW_01',\r
178             package_id='222',\r
179             version='',\r
180             vendor='',\r
181             netype='',\r
182             vnfd_model='',\r
183             status='NOT_INSTANTIATED',\r
184             nf_desc='vFW in Nanjing TIC Edge',\r
185             vnfdid='111',\r
186             create_time=now_time()\r
187         )\r
188         r1_get_vnfpackage_by_vnfdid = [\r
189             1,\r
190             json.JSONEncoder().encode(vnfpackage_info),\r
191             '200'\r
192         ]\r
193         mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid]\r
194         self.nf_inst_id = '1111'\r
195         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
196         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
197         data = inst_req_data\r
198         InstantiateVnf(\r
199             data,\r
200             nf_inst_id=self.nf_inst_id,\r
201             job_id=self.job_id\r
202         ).run()\r
203         self.assert_job_result(\r
204             self.job_id,\r
205             255,\r
206             'Failed to query vnf CSAR(111) from catalog.'\r
207         )\r
208 \r
209     @mock.patch.object(restcall, 'call_req')\r
210     def test_instantiate_vnf_when_applay_grant_failed(self, mock_call_req):\r
211         NfInstModel.objects.create(\r
212             nfinstid='1111',\r
213             nf_name='vFW_01',\r
214             package_id='222',\r
215             version='',\r
216             vendor='',\r
217             netype='',\r
218             vnfd_model='',\r
219             status='NOT_INSTANTIATED',\r
220             nf_desc='vFW in Nanjing TIC Edge',\r
221             vnfdid='111',\r
222             create_time=now_time()\r
223         )\r
224         r1_get_vnfpackage_by_vnfdid = [\r
225             0,\r
226             json.JSONEncoder().encode(vnfpackage_info),\r
227             '200'\r
228         ]\r
229         r2_apply_grant_result = [\r
230             1,\r
231             json.JSONEncoder().encode(self.grant_result),\r
232             '200'\r
233         ]\r
234         mock_call_req.side_effect = [\r
235             r1_get_vnfpackage_by_vnfdid,\r
236             r2_apply_grant_result\r
237         ]\r
238         self.nf_inst_id = '1111'\r
239         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
240         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
241         data = inst_req_data\r
242         InstantiateVnf(\r
243             data,\r
244             nf_inst_id=self.nf_inst_id,\r
245             job_id=self.job_id\r
246         ).run()\r
247         self.assert_job_result(\r
248             self.job_id,\r
249             255,\r
250             'Nf instancing apply grant exception'\r
251         )\r
252 \r
253     @mock.patch.object(restcall, 'call_req')\r
254     @mock.patch.object(api, 'call')\r
255     def test_instantiate_vnf_when_unexpected_exception(self, mock_call, mock_call_req):\r
256         NfInstModel.objects.create(\r
257             nfinstid='1111',\r
258             nf_name='vFW_01',\r
259             package_id='222',\r
260             version='',\r
261             vendor='',\r
262             netype='',\r
263             vnfd_model='',\r
264             status='NOT_INSTANTIATED',\r
265             nf_desc='vFW in Nanjing TIC Edge',\r
266             vnfdid='111',\r
267             create_time=now_time()\r
268         )\r
269         r1_get_vnfpackage_by_vnfdid = [\r
270             0,\r
271             json.JSONEncoder().encode(vnfpackage_info),\r
272             '200'\r
273         ]\r
274         r2_apply_grant_result = [\r
275             0,\r
276             json.JSONEncoder().encode(self.grant_result),\r
277             '200'\r
278         ]\r
279         mock_call_req.side_effect = [\r
280             r1_get_vnfpackage_by_vnfdid,\r
281             r2_apply_grant_result\r
282         ]\r
283         mock_call.side_effect = [\r
284             c1_data_get_tenant_id,\r
285             c2_data_create_volume,\r
286             c3_data_get_volume\r
287         ]\r
288         self.nf_inst_id = '1111'\r
289         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
290         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
291         data = inst_req_data\r
292         InstantiateVnf(\r
293             data,\r
294             nf_inst_id=self.nf_inst_id,\r
295             job_id=self.job_id\r
296         ).run()\r
297         self.assert_job_result(\r
298             self.job_id,\r
299             255,\r
300             'unexpected exception'\r
301         )\r
302 \r
303     @mock.patch.object(restcall, 'call_req')\r
304     @mock.patch.object(api, 'call')\r
305     @mock.patch.object(NotificationsUtil, 'post_notification')\r
306     def test_instantiate_vnf_success(self, mock_post_notification, mock_call, mock_call_req):\r
307         NfInstModel.objects.create(\r
308             nfinstid='1111',\r
309             nf_name='vFW_01',\r
310             package_id='222',\r
311             version='',\r
312             vendor='',\r
313             netype='',\r
314             vnfd_model='',\r
315             status='NOT_INSTANTIATED',\r
316             nf_desc='vFW in Nanjing TIC Edge',\r
317             vnfdid='111',\r
318             create_time=now_time()\r
319         )\r
320         SubscriptionModel.objects.create(\r
321             subscription_id=str(uuid.uuid4()),\r
322             callback_uri='api/gvnfmdriver/v1/vnfs/lifecyclechangesnotification',\r
323             auth_info=json.JSONEncoder().encode({\r
324                 'authType': ['BASIC'],\r
325                 'paramsBasic': {\r
326                     'userName': 'username',\r
327                     'password': 'password'\r
328                 }\r
329             }),\r
330             notification_types=str([\r
331                 'VnfLcmOperationOccurrenceNotification',\r
332                 'VnfIdentifierCreationNotification',\r
333                 'VnfIdentifierDeletionNotification'\r
334             ]),\r
335             operation_types=str(['INSTANTIATE']),\r
336             operation_states=str(['COMPLETED']),\r
337             vnf_instance_filter=json.JSONEncoder().encode({\r
338                 'vnfdIds': ['111'],\r
339                 'vnfProductsFromProviders': [],\r
340                 'vnfInstanceIds': ['1111'],\r
341                 'vnfInstanceNames': [],\r
342             })\r
343         )\r
344         r1_get_vnfpackage_by_vnfdid = [\r
345             0,\r
346             json.JSONEncoder().encode(vnfpackage_info),\r
347             '200'\r
348         ]\r
349         r2_apply_grant_result = [\r
350             0,\r
351             json.JSONEncoder().encode(self.grant_result),\r
352             '200'\r
353         ]\r
354         r3_all_aai_result = [\r
355             1,\r
356             json.JSONEncoder().encode(''),\r
357             '404'\r
358         ]\r
359         r4_lcm_notify_result = [\r
360             0,\r
361             json.JSONEncoder().encode(''),\r
362             '200'\r
363         ]\r
364         mock_call_req.side_effect = [\r
365             r1_get_vnfpackage_by_vnfdid,\r
366             r2_apply_grant_result,\r
367             r3_all_aai_result,\r
368             r4_lcm_notify_result\r
369         ]\r
370         mock_call.side_effect = [\r
371             c1_data_get_tenant_id,\r
372             c2_data_create_volume,\r
373             c3_data_get_volume,\r
374             c4_data_create_network,\r
375             c5_data_create_subnet,\r
376             c6_data_create_port,\r
377             c7_data_create_flavor,\r
378             c8_data_list_image,\r
379             c9_data_create_vm,\r
380             c10_data_get_vm\r
381         ]\r
382         mock_post_notification.side_effect = None\r
383         self.nf_inst_id = '1111'\r
384         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
385         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
386         data = inst_req_data\r
387         InstantiateVnf(\r
388             data,\r
389             nf_inst_id=self.nf_inst_id,\r
390             job_id=self.job_id\r
391         ).run()\r