1 # Copyright 2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from rest_framework.test import APIClient
18 from django.test import TestCase
19 from rest_framework import status
20 from catalog.packages.biz.sdc_vnf_package import NfDistributeThread, NfPkgDeleteThread
21 from catalog.pub.database.models import JobStatusModel, JobModel
22 from catalog.pub.database.models import VnfPackageModel
23 from catalog.pub.msapi import sdc
24 from catalog.pub.utils import restcall, toscaparser
25 from .const import vnfd_data
28 class TestNfPackage(TestCase):
30 self.client = APIClient()
31 VnfPackageModel.objects.filter().delete()
32 JobModel.objects.filter().delete()
33 JobStatusModel.objects.filter().delete()
34 self.vnfd_data = vnfd_data
39 def assert_job_result(self, job_id, job_progress, job_detail):
40 jobs = JobStatusModel.objects.filter(
42 progress=job_progress,
44 self.assertEqual(1, len(jobs))
46 @mock.patch.object(NfDistributeThread, 'run')
47 def test_nf_pkg_distribute_normal(self, mock_run):
48 resp = self.client.post(
49 "/api/catalog/v1/vnfpackages",
56 self.assertEqual(resp.status_code, status.HTTP_202_ACCEPTED)
58 def test_nf_pkg_distribute_when_csar_already_exist(self):
61 vnfdId="vcpe_vfw_zte_1_0"
69 self.assert_job_result("2", 255, "NF CSAR(1) already exists.")
71 @mock.patch.object(restcall, 'call_req')
72 @mock.patch.object(sdc, 'download_artifacts')
73 @mock.patch.object(toscaparser, 'parse_vnfd')
74 def test_nf_pkg_distribute_when_vnfd_already_exist(self,
76 mock_download_artifacts,
78 mock_parse_vnfd.return_value = json.JSONEncoder().encode(self.vnfd_data)
79 mock_download_artifacts.return_value = "/home/hss.csar"
80 mock_call_req.return_value = [0, json.JSONEncoder().encode([{
82 "toscaModelURL": "https://127.0.0.1:1234/sdc/v1/hss.csar"
84 VnfPackageModel(vnfPackageId="2", vnfdId="zte-hss-1.0").save()
91 self.assert_job_result("2", 255, "VNF package(zte-hss-1.0) already exists.")
93 @mock.patch.object(restcall, 'call_req')
94 @mock.patch.object(sdc, 'download_artifacts')
95 @mock.patch.object(toscaparser, 'parse_vnfd')
96 def test_nf_pkg_distribute_successfully(self,
98 mock_download_artifacts,
100 mock_parse_vnfd.return_value = json.JSONEncoder().encode(self.vnfd_data)
101 mock_download_artifacts.return_value = "/home/hss.csar"
102 mock_call_req.return_value = [0, json.JSONEncoder().encode([{
104 "toscaModelURL": "https://127.0.0.1:1234/sdc/v1/hss.csar"
112 self.assert_job_result("4", 100, "CSAR(1) distribute successfully.")
114 ###############################################################################################################
116 @mock.patch.object(NfPkgDeleteThread, 'run')
117 def test_nf_pkg_delete_normal(self, mock_run):
118 resp = self.client.delete("/api/catalog/v1/vnfpackages/1")
119 self.assertEqual(resp.status_code, status.HTTP_202_ACCEPTED)
121 def test_nf_pkg_normal_delete(self):
124 vnfdId="vcpe_vfw_zte_1_0"
130 self.assert_job_result("2", 100, "Delete CSAR(2) successfully.")
132 def test_nf_pkg_get_all(self):
138 vnfSoftwareVersion='',
147 vnfSoftwareVersion='',
151 resp = self.client.get("/api/catalog/v1/vnfpackages")
152 self.assertEqual(resp.status_code, status.HTTP_200_OK)
162 "downloadUrl": "http://127.0.0.1:8806/static/catalog/3/",
176 "downloadUrl": "http://127.0.0.1:8806/static/catalog/4/",
183 self.assertEqual(expect_data, resp.data)
185 def test_nf_pkg_get_one(self):
191 vnfSoftwareVersion='',
196 resp = self.client.get("/api/catalog/v1/vnfpackages/4")
197 self.assertEqual(resp.status_code, status.HTTP_200_OK)
206 "downloadUrl": "http://127.0.0.1:8806/static/catalog/4/",
212 self.assertEqual(expect_data, resp.data)
214 def test_nf_pkg_get_one_failed(self):
220 vnfSoftwareVersion='',
225 resp = self.client.get("/api/catalog/v1/vnfpackages/2")
226 self.assertEqual(resp.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
227 self.assertEqual({'error': 'Vnf package[2] not Found.'}, resp.data)
229 ###############################################################################################################
231 @mock.patch.object(toscaparser, 'parse_vnfd')
232 def test_vnfd_parse_normal(self, mock_parse_vnfd):
237 mock_parse_vnfd.return_value = json.JSONEncoder().encode({"c": "d"})
242 resp = self.client.post(
243 "/api/catalog/v1/parservnfd",
247 self.assertEqual(resp.status_code, status.HTTP_202_ACCEPTED)
248 self.assertEqual({"model": '{"c": "d"}'}, resp.data)
250 def test_vnfd_parse_when_csar_not_exist(self):
251 req_data = {"csarId": "1", "inputs": []}
252 resp = self.client.post(
253 "/api/catalog/v1/parservnfd",
257 self.assertEqual(resp.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
258 self.assertEqual(resp.data, {"error": "VNF CSAR(1) does not exist."})