e698c4db3ce04e9ec5c532bf1cc9112f7496d329
[vfc/gvnfm/vnflcm.git] / lcm / lcm / nf / tests / test_instantiate_vnf.py
1 # Copyright 2017 ZTE Corporation.\r
2 #\r
3 # Licensed under the Apache License, Version 2.0 (the "License");\r
4 # you may not use this file except in compliance with the License.\r
5 # You may obtain a copy of the License at\r
6 #\r
7 #         http://www.apache.org/licenses/LICENSE-2.0\r
8 #\r
9 # Unless required by applicable law or agreed to in writing, software\r
10 # distributed under the License is distributed on an "AS IS" BASIS,\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12 # See the License for the specific language governing permissions and\r
13 # limitations under the License.\r
14 \r
15 import json\r
16 import uuid\r
17 \r
18 import mock\r
19 from django.test import TestCase\r
20 from rest_framework import status\r
21 from rest_framework.test import APIClient\r
22 \r
23 from lcm.nf.biz.instantiate_vnf import InstantiateVnf\r
24 from lcm.nf.const import c1_data_get_tenant_id, c4_data_create_network, c2_data_create_volume, \\r
25     c5_data_create_subnet, c3_data_get_volume, c6_data_create_port, c7_data_create_flavor, c8_data_list_image, \\r
26     c9_data_create_vm, c10_data_get_vm, inst_req_data, vnfpackage_info\r
27 from lcm.pub.database.models import NfInstModel, JobStatusModel\r
28 from lcm.pub.utils import restcall\r
29 from lcm.pub.utils.jobutil import JobUtil\r
30 from lcm.pub.utils.timeutil import now_time\r
31 from lcm.pub.vimapi import api\r
32 \r
33 \r
34 class TestNFInstantiate(TestCase):\r
35     def setUp(self):\r
36         self.client = APIClient()\r
37         self.grant_result = {\r
38             "vimConnections": [\r
39                 {\r
40                     "vimid": 'vim_1',\r
41                     "accessInfo":\r
42                     {\r
43                         "tenant": 'chinamobile'\r
44                     }\r
45                 },\r
46             ],\r
47             "vnfId": "413aa1fe-b4d1-11e8-8268-dff5aab95c63",\r
48             "vimAssets":\r
49             {\r
50                 "vimComputeResourceFlavour": [\r
51                     {\r
52                         "resourceProviderId": "vgw",\r
53                         "vimFlavourId": "yui",\r
54                         "directive": ""\r
55                     },\r
56                 ]\r
57             }\r
58         }\r
59 \r
60     def tearDown(self):\r
61         pass\r
62 \r
63     def assert_job_result(self, job_id, job_progress, job_detail):\r
64         jobs = JobStatusModel.objects.filter(jobid=job_id,\r
65                                              progress=job_progress,\r
66                                              descp=job_detail)\r
67         self.assertEqual(1, len(jobs))\r
68 \r
69     @mock.patch.object(InstantiateVnf, 'run')\r
70     def test_instantiate_vnf(self, mock_run):\r
71         NfInstModel(nfinstid='12', nf_name='VNF1', status="UN_INSTANTIATED").save()\r
72         mock_run.re.return_value = None\r
73         response = self.client.post("/api/vnflcm/v1/vnf_instances/12/instantiate", data=inst_req_data, format='json')\r
74         self.failUnlessEqual(status.HTTP_202_ACCEPTED, response.status_code)\r
75 \r
76     def test_instantiate_vnf_when_inst_id_not_exist(self):\r
77         self.nf_inst_id = str(uuid.uuid4())\r
78         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
79         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
80         jobs = JobStatusModel.objects.filter(jobid=self.job_id,\r
81                                              progress=0,\r
82                                              descp="INST_VNF_READY")\r
83         self.assertEqual(1, len(jobs))\r
84         data = inst_req_data\r
85         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
86         self.assert_job_result(self.job_id, 255, "VNF nf_inst_id is not exist.")\r
87 \r
88     def test_instantiate_vnf_when_already_instantiated(self):\r
89         NfInstModel.objects.create(nfinstid='1111',\r
90                                    nf_name='vFW_01',\r
91                                    package_id='222',\r
92                                    version='',\r
93                                    vendor='',\r
94                                    netype='',\r
95                                    vnfd_model='',\r
96                                    status='INSTANTIATED',\r
97                                    nf_desc='vFW in Nanjing TIC Edge',\r
98                                    vnfdid='111',\r
99                                    create_time=now_time())\r
100         self.nf_inst_id = '1111'\r
101         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
102         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
103         jobs = JobStatusModel.objects.filter(jobid=self.job_id,\r
104                                              progress=0,\r
105                                              descp="INST_VNF_READY")\r
106         self.assertEqual(1, len(jobs))\r
107         data = inst_req_data\r
108         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
109         self.assert_job_result(self.job_id, 255, "VNF instantiationState is not NOT_INSTANTIATED.")\r
110 \r
111     @mock.patch.object(restcall, 'call_req')\r
112     def test_instantiate_vnf_when_get_packageinfo_by_csarid_failed(self, mock_call_req):\r
113         NfInstModel.objects.create(nfinstid='1111',\r
114                                    nf_name='vFW_01',\r
115                                    package_id='222',\r
116                                    version='',\r
117                                    vendor='',\r
118                                    netype='',\r
119                                    vnfd_model='',\r
120                                    status='NOT_INSTANTIATED',\r
121                                    nf_desc='vFW in Nanjing TIC Edge',\r
122                                    vnfdid='111',\r
123                                    create_time=now_time())\r
124         r1_get_vnfpackage_by_vnfdid = [1, json.JSONEncoder().encode(vnfpackage_info), '200']\r
125         mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid]\r
126         self.nf_inst_id = '1111'\r
127         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
128         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
129         data = inst_req_data\r
130         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
131         self.assert_job_result(self.job_id, 255, "Failed to query vnf CSAR(111) from catalog.")\r
132 \r
133     @mock.patch.object(restcall, 'call_req')\r
134     def test_instantiate_vnf_when_applay_grant_failed(self, mock_call_req):\r
135         NfInstModel.objects.create(nfinstid='1111',\r
136                                    nf_name='vFW_01',\r
137                                    package_id='222',\r
138                                    version='',\r
139                                    vendor='',\r
140                                    netype='',\r
141                                    vnfd_model='',\r
142                                    status='NOT_INSTANTIATED',\r
143                                    nf_desc='vFW in Nanjing TIC Edge',\r
144                                    vnfdid='111',\r
145                                    create_time=now_time())\r
146         r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']\r
147         r2_apply_grant_result = [1, json.JSONEncoder().encode(self.grant_result), '200']\r
148         mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result]\r
149         self.nf_inst_id = '1111'\r
150         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
151         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
152         data = inst_req_data\r
153         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
154         self.assert_job_result(self.job_id, 255, "Nf instancing apply grant exception")\r
155 \r
156     @mock.patch.object(restcall, 'call_req')\r
157     @mock.patch.object(api, 'call')\r
158     def test_instantiate_vnf_when_unexpected_exception(self, mock_call, mock_call_req):\r
159         NfInstModel.objects.create(nfinstid='1111',\r
160                                    nf_name='vFW_01',\r
161                                    package_id='222',\r
162                                    version='',\r
163                                    vendor='',\r
164                                    netype='',\r
165                                    vnfd_model='',\r
166                                    status='NOT_INSTANTIATED',\r
167                                    nf_desc='vFW in Nanjing TIC Edge',\r
168                                    vnfdid='111',\r
169                                    create_time=now_time())\r
170         r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']\r
171         r2_apply_grant_result = [0, json.JSONEncoder().encode(self.grant_result), '200']\r
172         mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result]\r
173         mock_call.side_effect = [c1_data_get_tenant_id, c2_data_create_volume, c3_data_get_volume]\r
174         self.nf_inst_id = '1111'\r
175         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
176         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
177         data = inst_req_data\r
178         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
179         self.assert_job_result(self.job_id, 255, "unexpected exception")\r
180 \r
181     @mock.patch.object(restcall, 'call_req')\r
182     @mock.patch.object(api, 'call')\r
183     def test_instantiate_vnf_success(self, mock_call, mock_call_req):\r
184         NfInstModel.objects.create(nfinstid='1111',\r
185                                    nf_name='vFW_01',\r
186                                    package_id='222',\r
187                                    version='',\r
188                                    vendor='',\r
189                                    netype='',\r
190                                    vnfd_model='',\r
191                                    status='NOT_INSTANTIATED',\r
192                                    nf_desc='vFW in Nanjing TIC Edge',\r
193                                    vnfdid='111',\r
194                                    create_time=now_time())\r
195         r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']\r
196         r2_apply_grant_result = [0, json.JSONEncoder().encode(self.grant_result), '200']\r
197         r3_all_aai_result = [1, json.JSONEncoder().encode(''), '404']\r
198         r4_lcm_notify_result = [0, json.JSONEncoder().encode(''), '200']\r
199         mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result, r3_all_aai_result, r4_lcm_notify_result]\r
200         mock_call.side_effect = [c1_data_get_tenant_id,\r
201                                  c2_data_create_volume, c3_data_get_volume,\r
202                                  c4_data_create_network,\r
203                                  c5_data_create_subnet,\r
204                                  c6_data_create_port,\r
205                                  c7_data_create_flavor,\r
206                                  c8_data_list_image, c9_data_create_vm, c10_data_get_vm]\r
207         self.nf_inst_id = '1111'\r
208         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
209         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
210         data = inst_req_data\r
211         InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
212         self.assert_job_result(self.job_id, 100, "Instantiate Vnf success.")\r