1 # Copyright 2017 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
18 from django.test import TestCase
19 from rest_framework import status
20 from rest_framework.test import APIClient
22 from catalog.packages.biz.sdc_vnf_package import NfDistributeThread, NfPkgDeleteThread
23 from catalog.pub.database.models import JobStatusModel, JobModel
24 from catalog.pub.database.models import VnfPackageModel
25 from catalog.pub.msapi import sdc
26 from catalog.pub.utils import restcall, toscaparser
27 from .const import vnfd_data
30 class TestNfPackage(TestCase):
32 self.client = APIClient()
33 VnfPackageModel.objects.filter().delete()
34 JobModel.objects.filter().delete()
35 JobStatusModel.objects.filter().delete()
36 self.vnfd_data = vnfd_data
41 def assert_job_result(self, job_id, job_progress, job_detail):
42 jobs = JobStatusModel.objects.filter(
44 progress=job_progress,
46 self.assertEqual(1, len(jobs))
48 @mock.patch.object(NfDistributeThread, 'run')
49 def test_nf_pkg_distribute_normal(self, mock_run):
50 resp = self.client.post(
51 "/api/catalog/v1/vnfpackages",
58 self.assertEqual(resp.status_code, status.HTTP_202_ACCEPTED)
60 def test_nf_pkg_distribute_when_csar_already_exist(self):
63 vnfdId="vcpe_vfw_zte_1_0"
71 self.assert_job_result("2", 255, "NF CSAR(1) already exists.")
73 @mock.patch.object(restcall, 'call_req')
74 @mock.patch.object(sdc, 'download_artifacts')
75 @mock.patch.object(toscaparser, 'parse_vnfd')
76 def test_nf_pkg_distribute_when_vnfd_already_exist(self,
78 mock_download_artifacts,
80 mock_parse_vnfd.return_value = json.JSONEncoder().encode(self.vnfd_data)
81 mock_download_artifacts.return_value = "/home/hss.csar"
82 mock_call_req.return_value = [0, json.JSONEncoder().encode([{
84 "toscaModelURL": "https://127.0.0.1:1234/sdc/v1/hss.csar"
86 VnfPackageModel(vnfPackageId="2", vnfdId="zte-hss-1.0").save()
93 self.assert_job_result("2", 255, "VNF package(zte-hss-1.0) already exists.")
95 @mock.patch.object(restcall, 'call_req')
96 @mock.patch.object(sdc, 'download_artifacts')
97 @mock.patch.object(toscaparser, 'parse_vnfd')
98 def test_nf_pkg_distribute_successfully(self,
100 mock_download_artifacts,
102 mock_parse_vnfd.return_value = json.JSONEncoder().encode(self.vnfd_data)
103 mock_download_artifacts.return_value = "/home/hss.csar"
104 mock_call_req.return_value = [0, json.JSONEncoder().encode([{
106 "toscaModelURL": "https://127.0.0.1:1234/sdc/v1/hss.csar"
114 self.assert_job_result("4", 100, "CSAR(1) distribute successfully.")
116 ###############################################################################################################
118 @mock.patch.object(NfPkgDeleteThread, 'run')
119 def test_nf_pkg_delete_normal(self, mock_run):
120 resp = self.client.delete("/api/catalog/v1/vnfpackages/1")
121 self.assertEqual(resp.status_code, status.HTTP_202_ACCEPTED)
123 def test_nf_pkg_normal_delete(self):
126 vnfdId="vcpe_vfw_zte_1_0"
132 self.assert_job_result("2", 100, "Delete CSAR(2) successfully.")
134 def test_nf_pkg_get_all(self):
140 vnfSoftwareVersion='',
149 vnfSoftwareVersion='',
153 resp = self.client.get("/api/catalog/v1/vnfpackages")
154 self.assertEqual(resp.status_code, status.HTTP_200_OK)
164 "downloadUrl": "http://127.0.0.1:8806/static/catalog/3/",
178 "downloadUrl": "http://127.0.0.1:8806/static/catalog/4/",
185 self.assertEqual(expect_data, resp.data)
187 def test_nf_pkg_get_one(self):
193 vnfSoftwareVersion='',
198 resp = self.client.get("/api/catalog/v1/vnfpackages/4")
199 self.assertEqual(resp.status_code, status.HTTP_200_OK)
208 "downloadUrl": "http://127.0.0.1:8806/static/catalog/4/",
214 self.assertEqual(expect_data, resp.data)
216 def test_nf_pkg_get_one_failed(self):
222 vnfSoftwareVersion='',
227 resp = self.client.get("/api/catalog/v1/vnfpackages/2")
228 self.assertEqual(resp.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
229 self.assertEqual({'error': 'Vnf package[2] not Found.'}, resp.data)
231 ###############################################################################################################
233 @mock.patch.object(toscaparser, 'parse_vnfd')
234 def test_vnfd_parse_normal(self, mock_parse_vnfd):
239 mock_parse_vnfd.return_value = json.JSONEncoder().encode({"c": "d"})
244 resp = self.client.post(
245 "/api/parser/v1/parservnfd",
249 self.assertEqual(resp.status_code, status.HTTP_202_ACCEPTED)
250 self.assertEqual({"model": '{"c": "d"}'}, resp.data)
252 def test_vnfd_parse_when_csar_not_exist(self):
253 req_data = {"csarId": "1", "inputs": []}
254 resp = self.client.post(
255 "/api/parser/v1/parservnfd",
259 self.assertEqual(resp.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
260 self.assertEqual(resp.data, {"error": "VNF CSAR(1) does not exist."})