Change gvnfm-vnflcm aai config
[vfc/gvnfm/vnflcm.git] / lcm / lcm / nf / vnfs / tests / test_vnf_create.py
1 # Copyright 2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15
16 import json
17 import uuid
18
19 import mock
20 from django.test import TestCase, Client
21 from rest_framework import status
22
23 from lcm.nf.vnfs.const import vnfd_rawdata, c1_data_get_tenant_id, c4_data_create_network, c2_data_create_volume, \
24     c5_data_create_subnet, c3_data_get_volume, c6_data_create_port, c7_data_create_flavor, c8_data_list_image, \
25     c9_data_create_vm, c10_data_get_vm, inst_req_data
26 from lcm.nf.vnfs.vnf_create.inst_vnf import InstVnf
27 from lcm.pub.database.models import NfInstModel, JobStatusModel
28 from lcm.pub.utils import restcall
29 from lcm.pub.utils.jobutil import JobUtil
30 from lcm.pub.utils.timeutil import now_time
31 from lcm.pub.vimapi import api
32
33
34 class TestNFInstantiate(TestCase):
35     def setUp(self):
36         self.client = Client()
37
38     def tearDown(self):
39         pass
40
41     def assert_job_result(self, job_id, job_progress, job_detail):
42         jobs = JobStatusModel.objects.filter(jobid=job_id,
43                                              progress=job_progress,
44                                              descp=job_detail)
45         self.assertEqual(1, len(jobs))
46
47     def test_swagger_ok(self):
48         response = self.client.get("/api/vnflcm/v1/swagger.json", format='json')
49         self.assertEqual(response.status_code, status.HTTP_200_OK)
50
51     @mock.patch.object(restcall, 'call_req')
52     def test_create_vnf_identifier(self, mock_call_req):
53         r1_get_csarid_by_vnfdid = [0, json.JSONEncoder().encode(
54             {
55                 'csars': [
56                     {
57                         'package_id': '222',
58                         'csarId': '2222',
59                         'vnfdId': '111'
60                     }
61                 ]
62             }), '200']
63         r2_get_rawdata_from_catalog = [0, json.JSONEncoder().encode(vnfd_rawdata), '200']
64         r3_create_vnf_to_aai = [0, json.JSONEncoder().encode({}), '200']
65         mock_call_req.side_effect = [r1_get_csarid_by_vnfdid, r2_get_rawdata_from_catalog, r3_create_vnf_to_aai]
66         data = {
67             "vnfdId": "111",
68             "vnfInstanceName": "vFW_01",
69             "vnfInstanceDescription": "vFW in Nanjing TIC Edge"
70         }
71         response = self.client.post("/api/vnflcm/v1/vnf_instances", data=data, format='json')
72         self.failUnlessEqual(status.HTTP_201_CREATED, response.status_code)
73         context = json.loads(response.content)
74         self.assertTrue(NfInstModel.objects.filter(nfinstid=context['vnfInstanceId']).exists())
75
76     @mock.patch.object(InstVnf, 'run')
77     def test_instantiate_vnf(self, mock_run):
78         mock_run.re.return_value = None
79         response = self.client.post("/api/vnflcm/v1/vnf_instances/12/instantiate", data={}, format='json')
80         self.failUnlessEqual(status.HTTP_202_ACCEPTED, response.status_code)
81
82     def test_instantiate_vnf_when_inst_id_not_exist(self):
83         self.nf_inst_id = str(uuid.uuid4())
84         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
85         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
86         jobs = JobStatusModel.objects.filter(jobid=self.job_id,
87                                              progress=0,
88                                              descp="INST_VNF_READY")
89         self.assertEqual(1, len(jobs))
90         data = inst_req_data
91         InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
92         self.assert_job_result(self.job_id, 255, "VNF nf_inst_id is not exist.")
93
94     @mock.patch.object(restcall, 'call_req')
95     def test_instantiate_vnf_when_get_package_info_by_vnfdid_failed(self, mock_call_req):
96         NfInstModel.objects.create(nfinstid='1111',
97                                    nf_name='vFW_01',
98                                    package_id='222',
99                                    version='',
100                                    vendor='',
101                                    netype='',
102                                    vnfd_model='',
103                                    status='NOT_INSTANTIATED',
104                                    nf_desc='vFW in Nanjing TIC Edge',
105                                    vnfdid='111',
106                                    create_time=now_time())
107         r1_get_csarid_by_vnfdid = [1, json.JSONEncoder().encode(
108             {
109                 'csars': [
110                     {
111                         'package_id': '222',
112                         'csarId': '2222',
113                         'vnfdId': '111'
114                     }
115                 ]
116             }), '200']
117         mock_call_req.side_effect = [r1_get_csarid_by_vnfdid]
118         self.nf_inst_id = '1111'
119         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
120         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
121         data = inst_req_data
122         InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
123         self.assert_job_result(self.job_id, 255, "Failed to query package_info of vnfdid(111) from nslcm.")
124
125     @mock.patch.object(restcall, 'call_req')
126     def test_instantiate_vnf_when_get_rawdata_by_csarid_failed(self, mock_call_req):
127         NfInstModel.objects.create(nfinstid='1111',
128                                    nf_name='vFW_01',
129                                    package_id='222',
130                                    version='',
131                                    vendor='',
132                                    netype='',
133                                    vnfd_model='',
134                                    status='NOT_INSTANTIATED',
135                                    nf_desc='vFW in Nanjing TIC Edge',
136                                    vnfdid='111',
137                                    create_time=now_time())
138         r1_get_csarid_by_vnfdid = [0, json.JSONEncoder().encode(
139             {
140                 'csars': [
141                     {
142                         'package_id': '222',
143                         'csarId': '2222',
144                         'vnfdId': '111'
145                     }
146                 ]
147             }), '200']
148         r2_get_rawdata_from_catalog = [1, json.JSONEncoder().encode(vnfd_rawdata), '200']
149         mock_call_req.side_effect = [r1_get_csarid_by_vnfdid, r2_get_rawdata_from_catalog]
150         self.nf_inst_id = '1111'
151         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
152         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
153         data = inst_req_data
154         InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
155         self.assert_job_result(self.job_id, 255, "Failed to query rawdata of CSAR(2222) from catalog.")
156
157     @mock.patch.object(restcall, 'call_req')
158     def test_instantiate_vnf_when_applay_grant_failed(self, mock_call_req):
159         NfInstModel.objects.create(nfinstid='1111',
160                                    nf_name='vFW_01',
161                                    package_id='222',
162                                    version='',
163                                    vendor='',
164                                    netype='',
165                                    vnfd_model='',
166                                    status='NOT_INSTANTIATED',
167                                    nf_desc='vFW in Nanjing TIC Edge',
168                                    vnfdid='111',
169                                    create_time=now_time())
170         r1_get_csarid_by_vnfdid = [0, json.JSONEncoder().encode(
171             {
172                 'csars': [
173                     {
174                         'package_id': '222',
175                         'csarId': '2222',
176                         'vnfdId': '111'
177                     }
178                 ]
179             }), '200']
180         r2_get_rawdata_from_catalog = [0, json.JSONEncoder().encode(vnfd_rawdata), '200']
181         r3_apply_grant_result = [1, json.JSONEncoder().encode(
182             {
183                 "vim": {
184                     "vimid": 'vimid_1',
185                     "accessinfo": {
186                         "tenant": 'tenantname_1'
187                     }
188                 }
189             }), '200']
190         mock_call_req.side_effect = [r1_get_csarid_by_vnfdid, r2_get_rawdata_from_catalog, r3_apply_grant_result]
191         self.nf_inst_id = '1111'
192         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
193         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
194         data = inst_req_data
195         InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
196         self.assert_job_result(self.job_id, 255, "Nf instancing apply grant exception")
197
198     @mock.patch.object(restcall, 'call_req')
199     @mock.patch.object(api, 'call')
200     def test_instantiate_vnf_when_unexpected_exception(self, mock_call, mock_call_req):
201         NfInstModel.objects.create(nfinstid='1111',
202                                    nf_name='vFW_01',
203                                    package_id='222',
204                                    version='',
205                                    vendor='',
206                                    netype='',
207                                    vnfd_model='',
208                                    status='NOT_INSTANTIATED',
209                                    nf_desc='vFW in Nanjing TIC Edge',
210                                    vnfdid='111',
211                                    create_time=now_time())
212         r1_get_csarid_by_vnfdid = [0, json.JSONEncoder().encode(
213             {
214                 'csars': [
215                     {
216                         'package_id': '222',
217                         'csarId': '2222',
218                         'vnfdId': '111'
219                     }
220                 ]
221             }), '200']
222         r2_get_rawdata_from_catalog = [0, json.JSONEncoder().encode(vnfd_rawdata), '200']
223         r3_apply_grant_result = [0, json.JSONEncoder().encode(
224             {
225                 "vim": {
226                     "vimid": 'vimid_1',
227                     "accessinfo": {
228                         "tenant": 'tenantname_1'
229                     }
230                 }
231             }), '200']
232         mock_call_req.side_effect = [r1_get_csarid_by_vnfdid, r2_get_rawdata_from_catalog, r3_apply_grant_result]
233         mock_call.side_effect = [c1_data_get_tenant_id, c2_data_create_volume, c3_data_get_volume]
234         self.nf_inst_id = '1111'
235         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
236         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
237         data = inst_req_data
238         InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
239         self.assert_job_result(self.job_id, 255, "unexpected exception")
240
241     @mock.patch.object(restcall, 'call_req')
242     @mock.patch.object(api, 'call')
243     def test_instantiate_vnf_success(self, mock_call, mock_call_req):
244         NfInstModel.objects.create(nfinstid='1111',
245                                    nf_name='vFW_01',
246                                    package_id='222',
247                                    version='',
248                                    vendor='',
249                                    netype='',
250                                    vnfd_model='',
251                                    status='NOT_INSTANTIATED',
252                                    nf_desc='vFW in Nanjing TIC Edge',
253                                    vnfdid='111',
254                                    create_time=now_time())
255         r1_get_csarid_by_vnfdid = [0, json.JSONEncoder().encode(
256             {
257                 'csars': [
258                     {
259                         'package_id': '222',
260                         'csarId': '2222',
261                         'vnfdId': '111'
262                     }
263                 ]
264             }), '200']
265         r2_get_rawdata_from_catalog = [0, json.JSONEncoder().encode(vnfd_rawdata), '200']
266         r3_apply_grant_result = [0, json.JSONEncoder().encode(
267             {
268                 "vim": {
269                     "vimid": 'vimid_1',
270                     "accessinfo": {
271                         "tenant": 'tenantname_1'
272                     }
273                 }
274             }), '200']
275         r4_lcm_notify_result = [0, json.JSONEncoder().encode(''), '200']
276         mock_call_req.side_effect = [r1_get_csarid_by_vnfdid, r2_get_rawdata_from_catalog,
277                                      r3_apply_grant_result, r4_lcm_notify_result]
278         mock_call.side_effect = [c1_data_get_tenant_id, c2_data_create_volume, c3_data_get_volume,
279                                  c4_data_create_network, c5_data_create_subnet, c6_data_create_port,
280                                  c7_data_create_flavor, c8_data_list_image, c9_data_create_vm, c10_data_get_vm]
281         self.nf_inst_id = '1111'
282         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
283         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
284         data = inst_req_data
285         InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
286         self.assert_job_result(self.job_id, 100, "Instantiate Vnf success.")