1 # Copyright 2018 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from rest_framework.test import APIClient
21 from django.test import TestCase
22 from rest_framework import status
23 from catalog.pub.config.config import CATALOG_ROOT_PATH
24 from catalog.packages.biz.vnf_package import VnfPkgUploadThread
25 from catalog.pub.database.models import VnfPackageModel
26 from catalog.pub.utils import toscaparser
37 class TestVnfPackage(TestCase):
39 self.client = APIClient()
48 "unit_size": 1000000000
50 "type_of_storage": "volume",
51 "rdma_enabled": False,
54 "volume_storage_id": "vNAT_Storage_6wdgwzedlb6sq18uzrr41sof7",
62 "vNAT_Storage_6wdgwzedlb6sq18uzrr41sof7"
69 "configurable_properties": {
71 "additional_vnfc_configurable_properties": {
78 "description": "the virtual machine of vNat",
89 "artifact_name": "vNatVNFImage",
90 "type": "tosca.artifacts.nfv.SwImage",
92 "operating_system": "linux",
93 "sw_image": "/swimages/vRouterVNF_ControlPlane.qcow2",
94 "name": "vNatVNFImage",
95 "container_format": "bare",
97 "disk_format": "qcow2",
98 "supported_virtualisation_environments": [
106 "file": "/swimages/vRouterVNF_ControlPlane.qcow2"
110 "flavor_extra_specs": {
111 "hw:cpu_sockets": "2",
112 "sw:ovs_dpdk": "true",
113 "hw:cpu_threads": "2",
114 "hw:numa_mem.1": "3072",
115 "hw:numa_mem.0": "1024",
116 "hw:numa_nodes": "2",
117 "hw:numa_cpus.0": "0,1",
118 "hw:numa_cpus.1": "2,3,4,5",
120 "hw:cpu_threads_policy": "isolate"
122 "cpu_frequency": "2.4 GHz",
126 "local_storages": [],
127 "image_file": "vNatVNFImage",
134 "operating_system": "linux",
135 "sw_image": "/swimages/vRouterVNF_ControlPlane.qcow2",
136 "name": "vNatVNFImage",
137 "container_format": "bare",
139 "disk_format": "qcow2",
140 "supported_virtualisation_environments": [
148 "image_file_id": "vNatVNFImage",
153 "local_storages": [],
157 "key_name": "sriov_plane",
158 "cp_id": "SRIOV_Port"
166 "vl_id": "sriov_link",
167 "route_external": False,
173 "connectivity_type": {
174 "layer_protocol": "ipv4",
175 "flow_pattern": "flat"
177 "description": "sriov_link",
186 "vl_id": "sriov_link",
187 "vdu_id": "vdu_vNat",
189 "cp_id": "SRIOV_Port",
193 "address_type": "ip_address",
195 "ip_address_type": "ipv4",
196 "floating_ip_activated": False,
197 "number_of_ip_address": 1,
198 "ip_address_assignment": True
202 "description": "sriov port",
203 "layer_protocol": "ipv4",
204 "virtual_network_interface_requirements": [
209 "support_mandatory": False,
211 "description": "sriov"
217 "support_mandatory": False,
219 "description": "normal"
223 "bitrate_requirement": 10
228 "vnfSoftwareVersion": "1.0.0",
229 "vnfProductName": "zte",
230 "localizationLanguage": [
234 "vnfProvider": "zte",
236 "defaultLocalizationLanguage": "english",
237 "vnfdId": "zte-hss-1.0",
239 "vnfProductInfoDescription": "hss",
240 "vnfdVersion": "1.0.0",
241 "vnfProductInfoName": "hss"
248 @mock.patch.object(toscaparser, 'parse_vnfd')
249 def test_upload_vnf_pkg(self, mock_parse_vnfd):
250 data = {'file': open(os.path.join(CATALOG_ROOT_PATH, "empty.txt"), "rb")}
251 VnfPackageModel.objects.create(
253 onboardingState="CREATED"
255 mock_parse_vnfd.return_value = json.JSONEncoder().encode(self.vnfd_data)
256 response = self.client.put("/api/vnfpkgm/v1/vnf_packages/222/package_content", data=data)
257 vnf_pkg1 = VnfPackageModel.objects.filter(vnfPackageId="222")
258 self.assertEqual("zte-hss-1.0", vnf_pkg1[0].vnfdId)
259 self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
261 os.remove(vnf_pkg1[0].localFilePath)
262 os.removedirs(os.path.join(CATALOG_ROOT_PATH, vnf_pkg1[0].vnfPackageId))
264 @mock.patch.object(toscaparser, 'parse_vnfd')
265 @mock.patch.object(urllib2, 'urlopen')
266 def test_upload_nf_pkg_from_uri(self, mock_urlopen, mock_parse_vnfd):
267 vnf_pkg = VnfPackageModel.objects.create(
269 onboardingState="CREATED"
271 mock_parse_vnfd.return_value = json.JSONEncoder().encode(self.vnfd_data)
272 req_data = {"addressInformation": "https://127.0.0.1:1234/sdc/v1/hss.csar"}
273 mock_urlopen.return_value = MockReq()
274 vnf_pkg_id = vnf_pkg.vnfPackageId
275 VnfPkgUploadThread(req_data, vnf_pkg_id).run()
276 vnf_pkg1 = VnfPackageModel.objects.filter(vnfPackageId="222")
277 self.assertEqual("zte-hss-1.0", vnf_pkg1[0].vnfdId)
279 os.remove(vnf_pkg1[0].localFilePath)
280 os.removedirs(os.path.join(CATALOG_ROOT_PATH, vnf_pkg1[0].vnfPackageId))
282 def test_create_vnf_pkg(self):
284 "userDefinedData": {"a": "A"}
286 response = self.client.post("/api/vnfpkgm/v1/vnf_packages", data=req_data, format="json")
287 resp_data = json.loads(response.content)
289 "id": resp_data.get("id"),
290 "onboardingState": "CREATED",
291 "operationalState": "DISABLED",
292 "usageState": "NOT_IN_USE",
293 "userDefinedData": {"a": "A"},
294 "_links": None # TODO
296 self.assertEqual(expect_resp_data, resp_data)
297 self.assertEqual(response.status_code, status.HTTP_201_CREATED)
299 def test_query_single_vnf(self):
300 VnfPackageModel.objects.create(
302 vnfdId="zte-hss-1.0",
304 vnfdProductName="hss",
305 vnfSoftwareVersion="1.0.0",
307 checksum='{"algorithm":"111", "hash": "11"}',
308 onboardingState="CREATED",
309 operationalState="DISABLED",
310 usageState="NOT_IN_USE",
311 userDefinedData='{"a": "A"}'
313 response = self.client.get("/api/vnfpkgm/v1/vnf_packages/222")
316 "vnfdId": "zte-hss-1.0",
317 "vnfProductName": "hss",
318 "vnfSoftwareVersion": "1.0.0",
319 "vnfdVersion": "1.0.0",
320 "checksum": {"algorithm": "111", "hash": "11"},
321 "softwareImages": None,
322 "additionalArtifacts": None,
323 "onboardingState": "CREATED",
324 "operationalState": "DISABLED",
325 "usageState": "NOT_IN_USE",
326 "userDefinedData": {"a": "A"},
329 self.assertEqual(response.data, expect_data)
330 self.assertEqual(response.status_code, status.HTTP_200_OK)
332 def test_query_multiple_vnf(self):
333 VnfPackageModel.objects.create(
335 vnfdId="zte-hss-1.0",
337 vnfdProductName="hss",
338 vnfSoftwareVersion="1.0.0",
340 checksum='{"algorithm":"111", "hash": "11"}',
341 onboardingState="CREATED",
342 operationalState="DISABLED",
343 usageState="NOT_IN_USE",
344 userDefinedData='{"a": "A"}'
346 VnfPackageModel.objects.create(
348 vnfdId="zte-hss-1.0",
350 vnfdProductName="hss",
351 vnfSoftwareVersion="1.0.0",
353 checksum='{"algorithm":"111", "hash": "11"}',
354 onboardingState="CREATED",
355 operationalState="DISABLED",
356 usageState="NOT_IN_USE",
357 userDefinedData='{"a": "A"}'
359 response = self.client.get("/api/vnfpkgm/v1/vnf_packages")
363 "vnfdId": "zte-hss-1.0",
364 "vnfProductName": "hss",
365 "vnfSoftwareVersion": "1.0.0",
366 "vnfdVersion": "1.0.0",
367 "checksum": {"algorithm": "111", "hash": "11"},
368 "softwareImages": None,
369 "additionalArtifacts": None,
370 "onboardingState": "CREATED",
371 "operationalState": "DISABLED",
372 "usageState": "NOT_IN_USE",
373 "userDefinedData": {"a": "A"},
378 "vnfdId": "zte-hss-1.0",
379 "vnfProductName": "hss",
380 "vnfSoftwareVersion": "1.0.0",
381 "vnfdVersion": "1.0.0",
382 "checksum": {"algorithm": "111", "hash": "11"},
383 "softwareImages": None,
384 "additionalArtifacts": None,
385 "onboardingState": "CREATED",
386 "operationalState": "DISABLED",
387 "usageState": "NOT_IN_USE",
388 "userDefinedData": {"a": "A"},
392 self.assertEqual(response.data, expect_data)
393 self.assertEqual(response.status_code, status.HTTP_200_OK)
395 def test_delete_single_vnf_pkg(self):
396 VnfPackageModel.objects.create(
398 vnfdId="zte-hss-1.0",
400 vnfdProductName="hss",
401 vnfSoftwareVersion="1.0.0",
403 checksum='{"algorithm":"111", "hash": "11"}',
404 onboardingState="CREATED",
405 operationalState="DISABLED",
406 usageState="NOT_IN_USE",
407 userDefinedData='{"a": "A"}'
409 response = self.client.delete("/api/vnfpkgm/v1/vnf_packages/222")
410 self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
411 self.assertEqual(response.data, None)
413 def test_fetch_vnf_pkg(self):
416 def test_fetch_partical_vnf_pkg(self):
417 with open("vnfPackage.csar", "wb") as fp:
418 fp.writelines("AAAABBBBCCCCDDDD")
419 VnfPackageModel.objects.create(
421 onboardingState="ONBOARDED",
422 localFilePath="vnfPackage.csar"
424 response = self.client.get("/api/vnfpkgm/v1/vnf_packages/222/package_content", RANGE="4-7")
425 partial_file_content = ''
426 for data in response.streaming_content:
427 partial_file_content = partial_file_content + data
428 self.assertEqual(response.status_code, status.HTTP_200_OK)
429 self.assertEqual('BBBB', partial_file_content)
430 os.remove("vnfPackage.csar")