d52787b9a26a705c3c083183884bd1240ce9c83f
[vfc/gvnfm/vnflcm.git] / lcm / lcm / nf / tests / test_instantiate_vnf.py
1 # Copyright 2017 ZTE Corporation.\r
2 #\r
3 # Licensed under the Apache License, Version 2.0 (the "License");\r
4 # you may not use this file except in compliance with the License.\r
5 # You may obtain a copy of the License at\r
6 #\r
7 #         http://www.apache.org/licenses/LICENSE-2.0\r
8 #\r
9 # Unless required by applicable law or agreed to in writing, software\r
10 # distributed under the License is distributed on an "AS IS" BASIS,\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12 # See the License for the specific language governing permissions and\r
13 # limitations under the License.\r
14 \r
15 import json\r
16 import uuid\r
17 \r
18 import mock\r
19 from django.test import TestCase\r
20 from rest_framework import status\r
21 from rest_framework.test import APIClient\r
22 \r
23 from lcm.nf.biz.instantiate_vnf import InstantiateVnf\r
24 from lcm.nf.const import c1_data_get_tenant_id, c4_data_create_network, c2_data_create_volume, \\r
25     c5_data_create_subnet, c3_data_get_volume, c6_data_create_port, c7_data_create_flavor, c8_data_list_image, \\r
26     c9_data_create_vm, c10_data_get_vm, inst_req_data, vnfpackage_info\r
27 from lcm.pub.database.models import NfInstModel, JobStatusModel, SubscriptionModel\r
28 from lcm.pub.utils import restcall\r
29 from lcm.pub.utils.jobutil import JobUtil\r
30 from lcm.pub.utils.timeutil import now_time\r
31 from lcm.pub.utils.notificationsutil import NotificationsUtil\r
32 from lcm.pub.vimapi import api\r
33 \r
34 \r
35 class TestNFInstantiate(TestCase):\r
36     def setUp(self):\r
37         self.client = APIClient()\r
38         self.grant_result = {\r
39             "vimConnections": [\r
40                 {\r
41                     "vimid": 'vim_1',\r
42                     "accessInfo":\r
43                     {\r
44                         "tenant": 'chinamobile'\r
45                     }\r
46                 },\r
47             ],\r
48             "vnfId": "413aa1fe-b4d1-11e8-8268-dff5aab95c63",\r
49             "vimAssets":\r
50             {\r
51                 "computeResourceFlavours": [\r
52                     {\r
53                         "resourceProviderId": "vgw",\r
54                         "vimFlavourId": "yui",\r
55                         "vimConnectionId": ""\r
56                     },\r
57                 ]\r
58             }\r
59         }\r
60 \r
61     def tearDown(self):\r
62         pass\r
63 \r
64     def assert_job_result(self, job_id, job_progress, job_detail):\r
65         jobs = JobStatusModel.objects.filter(jobid=job_id,\r
66                                              progress=job_progress,\r
67                                              descp=job_detail)\r
68         self.assertEqual(1, len(jobs))\r
69 \r
70     @mock.patch.object(InstantiateVnf, 'run')\r
71     def test_instantiate_vnf(self, mock_run):\r
72         NfInstModel(nfinstid='12', nf_name='VNF1', status='UN_INSTANTIATED').save()\r
73         mock_run.re.return_value = None\r
74         response = self.client.post('/api/vnflcm/v1/vnf_instances/12/instantiate', data=inst_req_data, format='json')\r
75         self.failUnlessEqual(status.HTTP_202_ACCEPTED, response.status_code)\r
76 \r
77     def test_instantiate_vnf_when_inst_id_not_exist(self):\r
78         self.nf_inst_id = str(uuid.uuid4())\r
79         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
80         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
81         jobs = JobStatusModel.objects.filter(jobid=self.job_id,\r
82                                              progress=0,\r
83                                              descp='INST_VNF_READY')\r
84         self.assertEqual(1, len(jobs))\r
85         data = inst_req_data\r
86         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
87         self.assert_job_result(self.job_id, 255, 'VNF nf_inst_id is not exist.')\r
88 \r
89     def test_instantiate_vnf_when_already_instantiated(self):\r
90         NfInstModel.objects.create(nfinstid='1111',\r
91                                    nf_name='vFW_01',\r
92                                    package_id='222',\r
93                                    version='',\r
94                                    vendor='',\r
95                                    netype='',\r
96                                    vnfd_model='',\r
97                                    status='INSTANTIATED',\r
98                                    nf_desc='vFW in Nanjing TIC Edge',\r
99                                    vnfdid='111',\r
100                                    create_time=now_time())\r
101         self.nf_inst_id = '1111'\r
102         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
103         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
104         jobs = JobStatusModel.objects.filter(jobid=self.job_id,\r
105                                              progress=0,\r
106                                              descp='INST_VNF_READY')\r
107         self.assertEqual(1, len(jobs))\r
108         data = inst_req_data\r
109         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
110         self.assert_job_result(self.job_id, 255, 'VNF instantiationState is not NOT_INSTANTIATED.')\r
111 \r
112     @mock.patch.object(restcall, 'call_req')\r
113     def test_instantiate_vnf_when_get_packageinfo_by_csarid_failed(self, mock_call_req):\r
114         NfInstModel.objects.create(nfinstid='1111',\r
115                                    nf_name='vFW_01',\r
116                                    package_id='222',\r
117                                    version='',\r
118                                    vendor='',\r
119                                    netype='',\r
120                                    vnfd_model='',\r
121                                    status='NOT_INSTANTIATED',\r
122                                    nf_desc='vFW in Nanjing TIC Edge',\r
123                                    vnfdid='111',\r
124                                    create_time=now_time())\r
125         r1_get_vnfpackage_by_vnfdid = [1, json.JSONEncoder().encode(vnfpackage_info), '200']\r
126         mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid]\r
127         self.nf_inst_id = '1111'\r
128         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
129         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
130         data = inst_req_data\r
131         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
132         self.assert_job_result(self.job_id, 255, 'Failed to query vnf CSAR(111) from catalog.')\r
133 \r
134     @mock.patch.object(restcall, 'call_req')\r
135     def test_instantiate_vnf_when_applay_grant_failed(self, mock_call_req):\r
136         NfInstModel.objects.create(nfinstid='1111',\r
137                                    nf_name='vFW_01',\r
138                                    package_id='222',\r
139                                    version='',\r
140                                    vendor='',\r
141                                    netype='',\r
142                                    vnfd_model='',\r
143                                    status='NOT_INSTANTIATED',\r
144                                    nf_desc='vFW in Nanjing TIC Edge',\r
145                                    vnfdid='111',\r
146                                    create_time=now_time())\r
147         r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']\r
148         r2_apply_grant_result = [1, json.JSONEncoder().encode(self.grant_result), '200']\r
149         mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result]\r
150         self.nf_inst_id = '1111'\r
151         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
152         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
153         data = inst_req_data\r
154         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
155         self.assert_job_result(self.job_id, 255, 'Nf instancing apply grant exception')\r
156 \r
157     @mock.patch.object(restcall, 'call_req')\r
158     @mock.patch.object(api, 'call')\r
159     def test_instantiate_vnf_when_unexpected_exception(self, mock_call, mock_call_req):\r
160         NfInstModel.objects.create(nfinstid='1111',\r
161                                    nf_name='vFW_01',\r
162                                    package_id='222',\r
163                                    version='',\r
164                                    vendor='',\r
165                                    netype='',\r
166                                    vnfd_model='',\r
167                                    status='NOT_INSTANTIATED',\r
168                                    nf_desc='vFW in Nanjing TIC Edge',\r
169                                    vnfdid='111',\r
170                                    create_time=now_time())\r
171         r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']\r
172         r2_apply_grant_result = [0, json.JSONEncoder().encode(self.grant_result), '200']\r
173         mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result]\r
174         mock_call.side_effect = [c1_data_get_tenant_id, c2_data_create_volume, c3_data_get_volume]\r
175         self.nf_inst_id = '1111'\r
176         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
177         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
178         data = inst_req_data\r
179         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
180         self.assert_job_result(self.job_id, 255, 'unexpected exception')\r
181 \r
182     @mock.patch.object(restcall, 'call_req')\r
183     @mock.patch.object(api, 'call')\r
184     @mock.patch.object(NotificationsUtil, 'post_notification')\r
185     def test_instantiate_vnf_success(self, mock_post_notification, mock_call, mock_call_req):\r
186         NfInstModel.objects.create(nfinstid='1111',\r
187                                    nf_name='vFW_01',\r
188                                    package_id='222',\r
189                                    version='',\r
190                                    vendor='',\r
191                                    netype='',\r
192                                    vnfd_model='',\r
193                                    status='NOT_INSTANTIATED',\r
194                                    nf_desc='vFW in Nanjing TIC Edge',\r
195                                    vnfdid='111',\r
196                                    create_time=now_time())\r
197         SubscriptionModel.objects.create(\r
198             subscription_id=str(uuid.uuid4()),\r
199             callback_uri='api/gvnfmdriver/v1/vnfs/lifecyclechangesnotification',\r
200             auth_info=json.JSONEncoder().encode({\r
201                 'authType': ['BASIC'],\r
202                 'paramsBasic': {\r
203                     'userName': 'username',\r
204                     'password': 'password'\r
205                 }\r
206             }),\r
207             notification_types=str([\r
208                 'VnfLcmOperationOccurrenceNotification',\r
209                 'VnfIdentifierCreationNotification',\r
210                 'VnfIdentifierDeletionNotification'\r
211             ]),\r
212             operation_types=str(['INSTANTIATE']),\r
213             operation_states=str(['COMPLETED']),\r
214             vnf_instance_filter=json.JSONEncoder().encode({\r
215                 'vnfdIds': ['111'],\r
216                 'vnfProductsFromProviders': [],\r
217                 'vnfInstanceIds': ['1111'],\r
218                 'vnfInstanceNames': [],\r
219             })\r
220         )\r
221         r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']\r
222         r2_apply_grant_result = [0, json.JSONEncoder().encode(self.grant_result), '200']\r
223         r3_all_aai_result = [1, json.JSONEncoder().encode(''), '404']\r
224         r4_lcm_notify_result = [0, json.JSONEncoder().encode(''), '200']\r
225         mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result, r3_all_aai_result, r4_lcm_notify_result]\r
226         mock_call.side_effect = [c1_data_get_tenant_id,\r
227                                  c2_data_create_volume, c3_data_get_volume,\r
228                                  c4_data_create_network,\r
229                                  c5_data_create_subnet,\r
230                                  c6_data_create_port,\r
231                                  c7_data_create_flavor,\r
232                                  c8_data_list_image, c9_data_create_vm, c10_data_get_vm]\r
233         mock_post_notification.side_effect = None\r
234         self.nf_inst_id = '1111'\r
235         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
236         JobUtil.add_job_status(self.job_id, 0, 'INST_VNF_READY')\r
237         data = inst_req_data\r
238         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
239         # self.assert_job_result(self.job_id, 100, 'Instantiate Vnf success.')\r