a8df7b37b88331e02499753fc514a7d290b991ff
[vfc/gvnfm/vnflcm.git] / lcm / lcm / nf / vnfs / tests / test_vnf_create.py
1 # Copyright 2017 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 import json
15 import uuid
16
17 import mock
18 from django.test import TestCase, Client
19 from rest_framework import status
20
21 from lcm.nf.vnfs.const import vnfd_rawdata
22 from lcm.nf.vnfs.vnf_create.inst_vnf import InstVnf
23 from lcm.pub.database.models import NfInstModel, JobStatusModel, VmInstModel, NetworkInstModel, \
24     SubNetworkInstModel, PortInstModel
25 from lcm.pub.utils import restcall
26 from lcm.pub.utils.jobutil import JobUtil
27 from lcm.pub.utils.timeutil import now_time
28
29
30 class TestNFInstantiate(TestCase):
31     def setUp(self):
32         self.client = Client()
33         VmInstModel.objects.create(vmid="1", vimid="1", resouceid="11", insttype=0, instid="1", vmname="test_01",
34                                    operationalstate=1)
35         VmInstModel.objects.create(vmid="2", vimid="2", resouceid="22", insttype=0, instid="2",
36                                    vmname="test_02", operationalstate=1)
37         NetworkInstModel.objects.create(networkid='1', vimid='1', resouceid='1', name='pnet_network',
38                                         tenant='admin', insttype=0, instid='1')
39         SubNetworkInstModel.objects.create(subnetworkid='1', vimid='1', resouceid='1', networkid='1',
40                                            name='sub_pnet', tenant='admin', insttype=0, instid='1')
41         PortInstModel.objects.create(portid='1', networkid='1', subnetworkid='1', vimid='1', resouceid='1',
42                                      name='aaa_pnet_cp', tenant='admin', insttype=0, instid='1')
43
44     def tearDown(self):
45         pass
46         VmInstModel.objects.all().delete()
47         NetworkInstModel.objects.all().delete()
48         SubNetworkInstModel.objects.all().delete()
49         PortInstModel.objects.all().delete()
50
51     def assert_job_result(self, job_id, job_progress, job_detail):
52         jobs = JobStatusModel.objects.filter(
53             jobid=job_id,
54             progress=job_progress,
55             descp=job_detail)
56         self.assertEqual(1, len(jobs))
57
58     def test_swagger_ok(self):
59         response = self.client.get("/openoapi/vnflcm/v1/swagger.json", format='json')
60         self.assertEqual(response.status_code, status.HTTP_200_OK)
61
62     @mock.patch.object(restcall, 'call_req')
63     def test_create_vnf_identifier(self, mock_call_req):
64         r1 = [0, json.JSONEncoder().encode({'package_id':'222', 'csar_id':'2222'}), '200']  # get csar_id from nslcm by vnfd_id
65         r2 = [0, json.JSONEncoder().encode(vnfd_rawdata), '200']  # get rawdata from catalog by csar_id
66         mock_call_req.side_effect = [r1, r2]
67         data = {
68             "vnfdId": "111",
69             "vnfInstanceName": "vFW_01",
70             "vnfInstanceDescription": "vFW in Nanjing TIC Edge"}
71         response = self.client.post("/openoapi/vnflcm/v1/vnf_instances", data=data, format='json')
72         self.failUnlessEqual(status.HTTP_201_CREATED, response.status_code)
73         context = json.loads(response.content)
74         self.assertTrue(NfInstModel.objects.filter(nfinstid=context['vnfInstanceId']).exists())
75
76     @mock.patch.object(InstVnf, 'run')
77     def test_instantiate_vnf(self, mock_run):
78         mock_run.re.return_value = None
79         response = self.client.post("/openoapi/vnflcm/v1/vnf_instances/12/instantiate", data={}, format='json')
80         self.failUnlessEqual(status.HTTP_202_ACCEPTED, response.status_code)
81
82     def test_instantiate_vnf_when_inst_id_not_exist(self):
83         self.nf_inst_id = str(uuid.uuid4())
84         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
85         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
86         data = inst_req_data
87         InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
88         self.assert_job_result(self.job_id, 255, "VNF nf_inst_id is not exist.")
89
90     @mock.patch.object(restcall, 'call_req')
91     def test_instantiate_vnf_when_get_package_info_by_vnfdid_failed(self, mock_call_req):
92         NfInstModel.objects.create(nfinstid='1111', nf_name='vFW_01', package_id='222',
93                                    version='', vendor='', netype='', vnfd_model='', status='NOT_INSTANTIATED',
94                                    nf_desc='vFW in Nanjing TIC Edge', vnfdid='111', create_time=now_time())
95         r1 = [1, json.JSONEncoder().encode({'package_id':'222', 'csar_id':'2222'}), '200']  # get csar_id from nslcm by vnfd_id
96         mock_call_req.side_effect = [r1]
97         self.nf_inst_id = '1111'
98         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
99         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
100         data = inst_req_data
101         InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
102         self.assert_job_result(self.job_id, 255, "Failed to query package_info of vnfdid(111) from nslcm.")
103
104     @mock.patch.object(restcall, 'call_req')
105     def test_instantiate_vnf_when_get_rawdata_by_csarid_failed(self, mock_call_req):
106         NfInstModel.objects.create(nfinstid='1111', nf_name='vFW_01', package_id='222',
107                                    version='', vendor='', netype='', vnfd_model='', status='NOT_INSTANTIATED',
108                                    nf_desc='vFW in Nanjing TIC Edge', vnfdid='111', create_time=now_time())
109         r1 = [0, json.JSONEncoder().encode({'package_id':'222', 'csar_id':'2222'}), '200']  # get csar_id from nslcm by vnfd_id
110         r2 = [1, json.JSONEncoder().encode(vnfd_rawdata), '200']  # get rawdata from catalog by csar_id
111         mock_call_req.side_effect = [r1, r2]
112         self.nf_inst_id = '1111'
113         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
114         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
115         data = inst_req_data
116         InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
117         self.assert_job_result(self.job_id, 255, "Failed to query rawdata of CSAR(2222) from catalog.")
118
119     @mock.patch.object(restcall, 'call_req')
120     def test_instantiate_vnf_when_applay_grant_failed(self, mock_call_req):
121         NfInstModel.objects.create(nfinstid='1111', nf_name='vFW_01', package_id='222',
122                                    version='', vendor='', netype='', vnfd_model='', status='NOT_INSTANTIATED',
123                                    nf_desc='vFW in Nanjing TIC Edge', vnfdid='111', create_time=now_time())
124         r1 = [0, json.JSONEncoder().encode({'package_id': '222', 'csar_id': '2222'}), '200']  # get csar_id from nslcm by vnfd_id
125         r2 = [0, json.JSONEncoder().encode(vnfd_rawdata), '200']  # get rawdata from catalog by csar_id
126         r3 = [1, json.JSONEncoder().encode({"vim":
127                                                 {
128                                                     "vimid": '1',
129                                                     "accessinfo": {"tenant": '2'}
130                                                  }
131                                             }), '200']  # apply_grant_to_nfvo
132         mock_call_req.side_effect = [r1, r2, r3]
133         self.nf_inst_id = '1111'
134         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
135         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
136         data = inst_req_data
137         InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
138         self.assert_job_result(self.job_id, 255, "Nf instancing apply grant exception")
139
140
141
142
143
144
145     # @mock.patch.object(restcall, 'call_req')
146     # def test_instantiate_vnf_when_(self, mock_call_req):
147     #     NfInstModel.objects.create(nfinstid='1111', nf_name='vFW_01', package_id='222',
148     #                                version='', vendor='', netype='', vnfd_model='', status='NOT_INSTANTIATED',
149     #                                nf_desc='vFW in Nanjing TIC Edge', vnfdid='111', create_time=now_time())
150     #     r1 = [0, json.JSONEncoder().encode({'package_id': '222', 'csar_id': '2222'}),
151     #           '200']  # get csar_id from nslcm by vnfd_id
152     #     r2 = [0, json.JSONEncoder().encode(vnfd_raw_data), '200']  # get rawdata from catalog by csar_id
153     #     r3 = [0, json.JSONEncoder().encode({"vim":{"vimid": '1', "accessinfo": {"tenant": '2'}}}), '200']  # apply_grant_to_nfvo
154     #     mock_call_req.side_effect = [r1, r2, r3]
155     #     self.nf_inst_id = '1111'
156     #     self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
157     #     JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
158     #     data = inst_req_data
159     #     InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
160     #     self.assert_job_result(self.job_id, 255, "Nf instancing apply grant exception")
161
162
163
164     # @mock.patch.object(restcall, 'call_req')
165     # # @mock.patch.object(adaptor, 'create_vim_res')
166     # def test_instantiate_vnf_when_create_res_failed(self, mock_call_req):
167     #     NfvoRegInfoModel.objects.create(nfvoid='nfvo111', vnfminstid='vnfm111', apiurl='http://10.74.44.11',
168     #                                     nfvouser='root', nfvopassword='root123')
169     #     r1 = [0, json.JSONEncoder().encode(vnfd_model_dict), '200']
170     #     r2 = [0, json.JSONEncoder().encode(vnfd_model_dict), '200']
171     #     r3 = [0, json.JSONEncoder().encode('Nf instancing apply grant'), '200']
172     #     # r4 = [0, json.JSONEncoder().encode('Nf instancing apply resource'), '200']
173     #     mock_call_req.side_effect = [r1, r2, r3]
174     #     # mock_create_vim_res.re.return_value = None
175     #     create_data = {
176     #         "vnfdId": "111",
177     #         "vnfInstanceName": "vFW_01",
178     #         "vnfInstanceDescription": " vFW in Nanjing TIC Edge"}
179     #     self.nf_inst_id = CreateVnf(create_data).do_biz()
180     #     self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
181     #     JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
182     #     data = inst_req_data
183     #     InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
184     #     self.assert_job_result(self.job_id, 255, "Create resource failed")
185
186     # @mock.patch.object(restcall, 'call_req')
187     # # @mock.patch.object(adaptor, 'create_vim_res')
188     # def test_instantiate_vnf_success(self, mock_call_req):
189     #     NfvoRegInfoModel.objects.create(nfvoid='nfvo111', vnfminstid='vnfm111', apiurl='http://10.74.44.11',
190     #                                     nfvouser='root', nfvopassword='root123')
191     #     r1 = [0, json.JSONEncoder().encode(vnfd_model_dict), '200']
192     #     r2 = [0, json.JSONEncoder().encode(vnfd_model_dict), '200']
193     #     r3 = [0, json.JSONEncoder().encode('Nf instancing apply grant'), '200']
194     #     r4 = [0, json.JSONEncoder().encode('None'), '200']
195     #     mock_call_req.side_effect = [r1, r2, r3, r4]
196     #     # mock_create_vim_res.re.return_value = None
197     #     create_data = {
198     #         "vnfdId": "111",
199     #         "vnfInstanceName": "vFW_01",
200     #         "vnfInstanceDescription": " vFW in Nanjing TIC Edge"}
201     #     self.nf_inst_id = CreateVnf(create_data).do_biz()
202     #     self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)
203     #     JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")
204     #     data = inst_req_data
205     #     InstVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()
206     #     self.assert_job_result(self.job_id, 100, "Instantiate Vnf success.")
207
208 inst_req_data = {
209     "flavourId": "flavour_1",
210     "instantiationLevelId": "instantiationLevel_1",
211     "extVirtualLinks": [
212         {
213             "vlInstanceId": "1",
214             "vim": {
215                 "vimInfoId": "1",
216                 "vimId": "1",
217                 "interfaceInfo": {
218                     "vimType": "vim",
219                     "apiVersion": "v2",
220                     "protocolType": "http"
221                 },
222                 "accessInfo": {
223                     "tenant": "tenant_vCPE",
224                     "username": "vCPE",
225                     "password": "vCPE_321"
226                 },
227                 "interfaceEndpoint": "http://10.43.21.105:80/"
228             },
229             "resourceId": "1246",
230             "extCps": [
231                 {
232                     "cpdId": "11",
233                     "addresses": [
234                         {
235                             "addressType": "MAC",
236                             "l2AddressData": "00:f3:43:20:a2:a3"
237                         },
238                         {
239                             "addressType": "IP",
240                             "l3AddressData": {
241                                 "iPAddressType": "IPv4",
242                                 "iPAddress": "192.168.104.2"
243                             }
244                         }
245                     ],
246                     "numDynamicAddresses": 0
247                 }
248             ]
249         }
250     ],
251     "localizationLanguage": "en_US",
252     "additionalParams": {}
253 }