Add test for pnf descriptor 89/63789/2
authorbiancunkang <bian.cunkang@zte.com.cn>
Thu, 30 Aug 2018 11:39:56 +0000 (19:39 +0800)
committerbiancunkang <bian.cunkang@zte.com.cn>
Thu, 30 Aug 2018 12:21:43 +0000 (20:21 +0800)
Add test for exception

Change-Id: Ie9e7e22aa4ac56a9e08e45903a0dc7a7eb0bee2f
Issue-ID: VFC-1038
Signed-off-by: biancunkang <bian.cunkang@zte.com.cn>
catalog/packages/biz/pnf_descriptor.py
catalog/packages/tests/test_pnf_descriptor.py
catalog/packages/views/pnf_descriptor_views.py

index 5cd20fd..188bc70 100644 (file)
@@ -28,61 +28,109 @@ from catalog.packages.const import PKG_STATUS
 logger = logging.getLogger(__name__)
 
 
-def create(data):
-    logger.info('Start to create a PNFD...')
-    user_defined_data = ignore_case_get(data, 'userDefinedData')
-    data = {
-        'id': str(uuid.uuid4()),
-        'pnfdOnboardingState': PKG_STATUS.CREATED,
-        'pnfdUsageState': PKG_STATUS.NOT_IN_USE,
-        'userDefinedData': user_defined_data,
-        '_links': None  # TODO
-    }
-    PnfPackageModel.objects.create(
-        pnfPackageId=data['id'],
-        onboardingState=data['pnfdOnboardingState'],
-        usageState=data['pnfdUsageState'],
-        userDefinedData=data['userDefinedData']
-    )
-    logger.info('A PNFD(%s) has been created.' % data['id'])
-    return data
-
-
-def query_multiple():
-    pnf_pkgs = PnfPackageModel.objects.all()
-    response_data = []
-    for pnf_pkg in pnf_pkgs:
-        data = fill_response_data(pnf_pkg)
-        response_data.append(data)
-    return response_data
-
-
-def query_single(pnfd_info_id):
-    pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
-    if not pnf_pkgs.exists():
-        logger.error('PNFD(%s) does not exist.' % pnfd_info_id)
-        raise ResourceNotFoundException('PNFD(%s) does not exist.' % pnfd_info_id)
-    return fill_response_data(pnf_pkgs[0])
-
-
-def upload(remote_file, pnfd_info_id):
-    logger.info('Start to upload PNFD(%s)...' % pnfd_info_id)
-    pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
-    if not pnf_pkgs.exists():
-        logger.info('PNFD(%s) does not exist.' % pnfd_info_id)
-        raise CatalogException('PNFD (%s) does not exist.' % pnfd_info_id)
-
-    pnf_pkgs.update(onboardingState=PKG_STATUS.UPLOADING)
-    local_file_name = remote_file.name
-    local_file_dir = os.path.join(CATALOG_ROOT_PATH, pnfd_info_id)
-    local_file_name = os.path.join(local_file_dir, local_file_name)
-    if not os.path.exists(local_file_dir):
-        fileutil.make_dirs(local_file_dir)
-    with open(local_file_name, 'wb') as local_file:
-        for chunk in remote_file.chunks(chunk_size=1024 * 8):
-            local_file.write(chunk)
-    logger.info('PNFD(%s) content has been uploaded.' % pnfd_info_id)
-    return local_file_name
+class PnfPackage(object):
+
+    def __init__(self):
+        pass
+
+    def create(self, data):
+        logger.info('Start to create a PNFD...')
+        user_defined_data = ignore_case_get(data, 'userDefinedData')
+        data = {
+            'id': str(uuid.uuid4()),
+            'pnfdOnboardingState': PKG_STATUS.CREATED,
+            'pnfdUsageState': PKG_STATUS.NOT_IN_USE,
+            'userDefinedData': user_defined_data,
+            '_links': None  # TODO
+        }
+        PnfPackageModel.objects.create(
+            pnfPackageId=data['id'],
+            onboardingState=data['pnfdOnboardingState'],
+            usageState=data['pnfdUsageState'],
+            userDefinedData=data['userDefinedData']
+        )
+        logger.info('A PNFD(%s) has been created.' % data['id'])
+        return data
+
+    def query_multiple(self):
+        pnf_pkgs = PnfPackageModel.objects.all()
+        response_data = []
+        for pnf_pkg in pnf_pkgs:
+            data = fill_response_data(pnf_pkg)
+            response_data.append(data)
+        return response_data
+
+    def query_single(self, pnfd_info_id):
+        pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
+        if not pnf_pkgs.exists():
+            logger.error('PNFD(%s) does not exist.' % pnfd_info_id)
+            raise ResourceNotFoundException('PNFD(%s) does not exist.' % pnfd_info_id)
+        return fill_response_data(pnf_pkgs[0])
+
+    def upload(self, remote_file, pnfd_info_id):
+        logger.info('Start to upload PNFD(%s)...' % pnfd_info_id)
+        pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
+        if not pnf_pkgs.exists():
+            logger.info('PNFD(%s) does not exist.' % pnfd_info_id)
+            raise CatalogException('PNFD (%s) does not exist.' % pnfd_info_id)
+
+        pnf_pkgs.update(onboardingState=PKG_STATUS.UPLOADING)
+        local_file_name = remote_file.name
+        local_file_dir = os.path.join(CATALOG_ROOT_PATH, pnfd_info_id)
+        local_file_name = os.path.join(local_file_dir, local_file_name)
+        if not os.path.exists(local_file_dir):
+            fileutil.make_dirs(local_file_dir)
+        with open(local_file_name, 'wb') as local_file:
+            for chunk in remote_file.chunks(chunk_size=1024 * 8):
+                local_file.write(chunk)
+        logger.info('PNFD(%s) content has been uploaded.' % pnfd_info_id)
+        return local_file_name
+
+    def delete_single(self, pnfd_info_id):
+        logger.info('Start to delete PNFD(%s)...' % pnfd_info_id)
+        pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
+        if not pnf_pkgs.exists():
+            logger.info('PNFD(%s) has been deleted.' % pnfd_info_id)
+            return
+        '''
+        if pnf_pkgs[0].usageState != PKG_STATUS.NOT_IN_USE:
+            logger.info('PNFD(%s) shall be NOT_IN_USE.' % pnfd_info_id)
+            raise CatalogException('PNFD(%s) shall be NOT_IN_USE.' % pnfd_info_id)
+        '''
+        ns_pkgs = NSPackageModel.objects.all()
+        for ns_pkg in ns_pkgs:
+            nsd_model = None
+            if ns_pkg.nsdModel:
+                nsd_model = json.JSONDecoder().decode(ns_pkg.nsdModel)
+            pnf_info_ids = []
+            for pnf in nsd_model['pnfs']:
+                pnfd_id = pnf["properties"]["id"]
+                pkgs = PnfPackageModel.objects.filter(pnfdId=pnfd_id)
+                for pkg in pkgs:
+                    pnf_info_ids.append(pkg.pnfPackageId)
+            if pnfd_info_id in pnf_info_ids:
+                logger.info('PNFD(%s) is referenced.' % pnfd_info_id)
+                raise CatalogException('PNFD(%s) is referenced.' % pnfd_info_id)
+
+        pnf_pkgs.delete()
+        pnf_pkg_path = os.path.join(CATALOG_ROOT_PATH, pnfd_info_id)
+        fileutil.delete_dirs(pnf_pkg_path)
+        logger.debug('PNFD(%s) has been deleted.' % pnfd_info_id)
+
+    def download(self, pnfd_info_id):
+        logger.info('Start to download PNFD(%s)...' % pnfd_info_id)
+        pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
+        if not pnf_pkgs.exists():
+            logger.error('PNFD(%s) does not exist.' % pnfd_info_id)
+            raise ResourceNotFoundException('PNFD(%s) does not exist.' % pnfd_info_id)
+        if pnf_pkgs[0].onboardingState != PKG_STATUS.ONBOARDED:
+            logger.error('PNFD(%s) is not ONBOARDED.' % pnfd_info_id)
+            raise CatalogException('PNFD(%s) is not ONBOARDED.' % pnfd_info_id)
+        local_file_path = pnf_pkgs[0].localFilePath
+        local_file_name = local_file_path.split('/')[-1]
+        local_file_name = local_file_name.split('\\')[-1]
+        logger.info('PNFD(%s) has been downloaded.' % pnfd_info_id)
+        return local_file_path, local_file_name, os.path.getsize(local_file_path)
 
 
 def parse_pnfd_and_save(pnfd_info_id, local_file_name):
@@ -110,53 +158,6 @@ def parse_pnfd_and_save(pnfd_info_id, local_file_name):
     logger.info('PNFD(%s) has been processed.' % pnfd_info_id)
 
 
-def download(pnfd_info_id):
-    logger.info('Start to download PNFD(%s)...' % pnfd_info_id)
-    pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
-    if not pnf_pkgs.exists():
-        logger.error('PNFD(%s) does not exist.' % pnfd_info_id)
-        raise ResourceNotFoundException('PNFD(%s) does not exist.' % pnfd_info_id)
-    if pnf_pkgs[0].onboardingState != PKG_STATUS.ONBOARDED:
-        logger.error('PNFD(%s) is not ONBOARDED.' % pnfd_info_id)
-        raise CatalogException('PNFD(%s) is not ONBOARDED.' % pnfd_info_id)
-    local_file_path = pnf_pkgs[0].localFilePath
-    local_file_name = local_file_path.split('/')[-1]
-    local_file_name = local_file_name.split('\\')[-1]
-    logger.info('PNFD(%s) has been downloaded.' % pnfd_info_id)
-    return local_file_path, local_file_name, os.path.getsize(local_file_path)
-
-
-def delete_single(pnfd_info_id):
-    logger.info('Start to delete PNFD(%s)...' % pnfd_info_id)
-    pnf_pkgs = PnfPackageModel.objects.filter(pnfPackageId=pnfd_info_id)
-    if not pnf_pkgs.exists():
-        logger.info('PNFD(%s) has been deleted.' % pnfd_info_id)
-        return
-    '''
-    if pnf_pkgs[0].usageState != PKG_STATUS.NOT_IN_USE:
-        logger.info('PNFD(%s) shall be NOT_IN_USE.' % pnfd_info_id)
-        raise CatalogException('PNFD(%s) shall be NOT_IN_USE.' % pnfd_info_id)
-    '''
-    ns_pkgs = NSPackageModel.objects.all()
-    for ns_pkg in ns_pkgs:
-        if ns_pkg.nsdModel:
-            nsd_model = json.JSONDecoder().decode(ns_pkg.nsdModel)
-        pnf_info_ids = []
-        for pnf in nsd_model['pnfs']:
-            pnfd_id = pnf["properties"]["id"]
-            pkgs = PnfPackageModel.objects.filter(pnfdId=pnfd_id)
-            for pkg in pkgs:
-                pnf_info_ids.append(pkg.pnfPackageId)
-        if pnfd_info_id in pnf_info_ids:
-            logger.info('PNFD(%s) is referenced.' % pnfd_info_id)
-            raise CatalogException('PNFD(%s) is referenced.' % pnfd_info_id)
-
-    pnf_pkgs.delete()
-    pnf_pkg_path = os.path.join(CATALOG_ROOT_PATH, pnfd_info_id)
-    fileutil.delete_dirs(pnf_pkg_path)
-    logger.debug('PNFD(%s) has been deleted.' % pnfd_info_id)
-
-
 def fill_response_data(pnf_pkg):
     data = {
         'id': pnf_pkg.pnfPackageId,
index 195433e..0b8ce10 100644 (file)
@@ -27,6 +27,7 @@ from catalog.pub.utils import toscaparser
 from catalog.packages.const import PKG_STATUS
 from catalog.packages.tests.const import pnfd_data
 from catalog.pub.config.config import CATALOG_ROOT_PATH
+from catalog.packages.biz.pnf_descriptor import PnfPackage
 
 
 class TestPnfDescriptor(TestCase):
@@ -205,3 +206,44 @@ class TestPnfDescriptor(TestCase):
         self.assertEqual(resp.status_code, status.HTTP_200_OK)
         self.assertEqual('test1test2', file_content)
         os.remove('pnfd_content.txt')
+
+    def test_pnfd_download_failed(self):
+        response = self.client.get("/api/nsd/v1/pnf_descriptors/22/pnfd_content")
+        self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
+
+    @mock.patch.object(PnfPackage, "create")
+    def test_pnfd_create_when_catch_exception(self, mock_create):
+        request_data = {'userDefinedData': self.user_defined_data}
+        mock_create.side_effect = TypeError('integer type')
+        response = self.client.post('/api/nsd/v1/pnf_descriptors', data=request_data, format='json')
+        self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+    @mock.patch.object(PnfPackage, "delete_single")
+    def test_delete_single_when_catch_exception(self, mock_delete_single):
+        mock_delete_single.side_effect = TypeError("integer type")
+        response = self.client.delete("/api/nsd/v1/pnf_descriptors/22", format='json')
+        self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+    @mock.patch.object(PnfPackage, "query_single")
+    def test_query_single_when_catch_exception(self, mock_query_single):
+        mock_query_single.side_effect = TypeError("integer type")
+        response = self.client.get('/api/nsd/v1/pnf_descriptors/22', format='json')
+        self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+    @mock.patch.object(PnfPackage, "query_multiple")
+    def test_query_multiple_when_catch_exception(self, mock_query_muitiple):
+        mock_query_muitiple.side_effect = TypeError("integer type")
+        response = self.client.get('/api/nsd/v1/pnf_descriptors', format='json')
+        self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+    @mock.patch.object(PnfPackage, "upload")
+    def test_upload_when_catch_exception(self, mock_upload):
+        mock_upload.side_effect = TypeError("integer type")
+        response = self.client.put("/api/nsd/v1/pnf_descriptors/22/pnfd_content")
+        self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+    @mock.patch.object(PnfPackage, "download")
+    def test_download_when_catch_exception(self, mock_download):
+        mock_download.side_effect = TypeError("integer type")
+        response = self.client.get("/api/nsd/v1/pnf_descriptors/22/pnfd_content")
+        self.assertEqual(response.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
index b0d02a8..d660092 100644 (file)
@@ -17,8 +17,7 @@ import traceback
 
 from django.http import FileResponse
 
-from catalog.packages.biz.pnf_descriptor import create, delete_single, download, query_multiple, query_single, upload, \
-    parse_pnfd_and_save, handle_upload_failed
+from catalog.packages.biz.pnf_descriptor import PnfPackage, parse_pnfd_and_save, handle_upload_failed
 from catalog.packages.serializers.create_pnfd_info_request import CreatePnfdInfoRequestSerializer
 from catalog.packages.serializers.pnfd_info import PnfdInfoSerializer
 from catalog.packages.serializers.pnfd_infos import PnfdInfosSerializer
@@ -55,7 +54,7 @@ def pnfd_info_rd(request, pnfdInfoId):  # TODO
     if request.method == 'GET':
         logger.debug("Query an individual PNF descriptor> %s" % request.data)
         try:
-            data = query_single(pnfdInfoId)
+            data = PnfPackage().query_single(pnfdInfoId)
             pnfd_info = validate_data(data, PnfdInfoSerializer)
             return Response(data=pnfd_info.data, status=status.HTTP_200_OK)
         except ResourceNotFoundException as e:
@@ -73,7 +72,7 @@ def pnfd_info_rd(request, pnfdInfoId):  # TODO
     if request.method == 'DELETE':
         logger.debug("Delete an individual PNFD resource> %s" % request.data)
         try:
-            delete_single(pnfdInfoId)
+            PnfPackage().delete_single(pnfdInfoId)
             return Response(data=None, status=status.HTTP_204_NO_CONTENT)
         except CatalogException as e:
             logger.error(e.message)
@@ -108,7 +107,7 @@ def pnf_descriptors_rc(request, *args, **kwargs):
     if request.method == 'POST':
         try:
             create_pnfd_info_request = validate_data(request.data, CreatePnfdInfoRequestSerializer)
-            data = create(create_pnfd_info_request.data)
+            data = PnfPackage().create(create_pnfd_info_request.data)
             pnfd_info = validate_data(data, PnfdInfoSerializer)
             return Response(data=pnfd_info.data, status=status.HTTP_201_CREATED)
         except CatalogException as e:
@@ -122,7 +121,7 @@ def pnf_descriptors_rc(request, *args, **kwargs):
 
     if request.method == 'GET':
         try:
-            data = query_multiple()
+            data = PnfPackage().query_multiple()
             pnfd_infos = validate_data(data, PnfdInfosSerializer)
             return Response(data=pnfd_infos.data, status=status.HTTP_200_OK)
         except CatalogException as e:
@@ -160,7 +159,7 @@ def pnfd_content_ru(request, *args, **kwargs):
     if request.method == 'PUT':
         files = request.FILES.getlist('file')
         try:
-            local_file_name = upload(files[0], pnfd_info_id)
+            local_file_name = PnfPackage().upload(files[0], pnfd_info_id)
             parse_pnfd_and_save(pnfd_info_id, local_file_name)
             return Response(data=None, status=status.HTTP_204_NO_CONTENT)
         except CatalogException as e:
@@ -176,7 +175,7 @@ def pnfd_content_ru(request, *args, **kwargs):
 
     if request.method == 'GET':
         try:
-            file_path, file_name, file_size = download(pnfd_info_id)
+            file_path, file_name, file_size = PnfPackage().download(pnfd_info_id)
             response = FileResponse(open(file_path, 'rb'), status=status.HTTP_200_OK)
             response['Content-Disposition'] = 'attachment; filename=%s' % file_name.encode('utf-8')
             response['Content-Length'] = file_size