1 # Copyright (C) 2019 ZTE. All Rights Reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from django.test import TestCase
18 from rest_framework import status
19 from rest_framework.test import APIClient
21 from lcm.pub.database.models import NfInstModel, JobStatusModel, StorageInstModel, NetworkInstModel, \
22 SubNetworkInstModel, PortInstModel, FlavourInstModel, VmInstModel, VNFCInstModel
23 from lcm.pub.exceptions import NFLCMException
24 from lcm.pub.utils import restcall
25 from lcm.pub.vimapi import api
26 from lcm.pub.utils.jobutil import JobUtil
27 from lcm.nf.biz.change_ext_conn import ChangeExtConn
31 class TestChangeExtConn(TestCase):
33 self.client = APIClient()
34 NfInstModel(nfinstid='12345',
40 vnfSoftwareVersion="V1",
43 status='NOT_INSTANTIATED').save()
44 NfInstModel(nfinstid='123',
50 vnfSoftwareVersion="V1",
53 status='INSTANTIATED').save()
57 "resourceId": "329efb86-5cbb-4fc0-bc7c-6ea28f9d7389",
58 "resourceSubnetId": "429efb86-5cbb-4fc0-bc7c-6ea28f9d7389",
64 "layerProtocol": "IP_OVER_ETHERNET",
68 "numDynamicAddresses": 0,
69 "subnetId": "59e9ffa9-b67e-4c05-b191-ed179007536e"
77 "vimConnectionInfo": [{
78 "id": "tecs_RegionOne",
79 "vimType": "openstack",
80 "vimId": "tecs_RegionOne",
82 "tenant": "chinamobile"
86 "vmid": "552ea058-6441-4de5-b4c1-b0a52c7557e8"
91 NfInstModel.objects.filter(nfinstid='12345').delete()
92 NfInstModel.objects.filter(nfinstid='123').delete()
94 def assert_job_result(self, job_id, job_progress, job_detail):
95 jobs = JobStatusModel.objects.filter(
97 progress=job_progress,
100 self.assertEqual(1, len(jobs))
102 def test_change_ext_conn_not_found(self):
103 url = "/api/vnflcm/v1/vnf_instances/12/change_ext_conn"
104 response = self.client.post(url,
107 self.assertEqual(status.HTTP_404_NOT_FOUND, response.status_code)
109 def test_change_ext_conn_conflict(self):
110 url = "/api/vnflcm/v1/vnf_instances/12345/change_ext_conn"
111 response = self.client.post(url,
114 self.assertEqual(status.HTTP_409_CONFLICT, response.status_code)
116 def test_change_ext_conn_badreq(self):
117 url = "/api/vnflcm/v1/vnf_instances/123/change_ext_conn"
118 response = self.client.post(url,
121 self.assertEqual(status.HTTP_400_BAD_REQUEST, response.status_code)
123 @mock.patch.object(JobUtil, 'create_job')
124 def test_change_ext_conn_inner_error(self, mock_run):
125 mock_run.return_value = NFLCMException('Boom!')
126 url = "/api/vnflcm/v1/vnf_instances/123/change_ext_conn"
127 response = self.client.post(url,
131 status.HTTP_500_INTERNAL_SERVER_ERROR,
132 response.status_code)
134 @mock.patch.object(restcall, 'call_req')
135 @mock.patch.object(api, 'call')
136 def test_change_ext_conn_sucess(self, mock_call, mock_call_req):
137 self.nf_inst_id = '12345'
138 res_cache = {"volume": {}, "flavor": {}, "port": {}}
139 res_cache["port"]["ext_cp"] = "port1"
140 NfInstModel(nfinstid=self.nf_inst_id,
146 vnfSoftwareVersion="V1",
149 status='INSTANTIATED',
150 vnfd_model=json.dumps(const.vnfd_for_scale),
151 vimInfo=json.dumps({}),
152 resInfo=json.dumps(res_cache)).save()
153 StorageInstModel.objects.create(
158 instid=self.nf_inst_id,
161 NetworkInstModel.objects.create(
169 instid=self.nf_inst_id
171 SubNetworkInstModel.objects.create(
180 instid=self.nf_inst_id
182 PortInstModel.objects.create(
192 instid=self.nf_inst_id
194 FlavourInstModel.objects.create(
198 instid=self.nf_inst_id,
200 name="Flavor_sunshine"
202 VmInstModel.objects.create(
207 instid=self.nf_inst_id,
212 VmInstModel.objects.create(
217 instid=self.nf_inst_id,
222 VNFCInstModel.objects.create(
224 instid=self.nf_inst_id,
227 VNFCInstModel.objects.create(
229 instid=self.nf_inst_id,
232 r1_apply_grant_result = [
234 json.JSONEncoder().encode(const.instantiate_grant_result),
237 mock_call_req.side_effect = [
238 r1_apply_grant_result,
240 mock_call.side_effect = [
241 const.c1_data_get_tenant_id,
242 const.c7_data_create_flavor,
243 const.c6_data_create_port
245 self.job_id = JobUtil.create_job('NF', 'VNF_CHANGE_EXT_CONN', self.nf_inst_id)
246 JobUtil.add_job_status(self.job_id, 0, "VNF_'VNF_CHANGE_EXT_CONN'_READY")
248 ChangeExtConn(self.req_data, self.nf_inst_id, self.job_id,).run()
250 print([{job.progress: job.descp} for job in JobStatusModel.objects.filter(jobid=self.job_id)])
251 self.assert_job_result(
254 'Change ext conn success.'