X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=catalog%2Fpackages%2Ftests%2Ftest_vnfpackage.py;h=0b8bb25cbb96091a6f3048cc42fe5d16097d54d4;hb=ac2f763bcf8ed41d9f2867a33b29860318854502;hp=0d8cbadb1042f5e954c9f67a404deb5dd234d805;hpb=5fd2ae6a4517980ce0f89247bb354220748ae67b;p=modeling%2Fetsicatalog.git diff --git a/catalog/packages/tests/test_vnfpackage.py b/catalog/packages/tests/test_vnfpackage.py index 0d8cbad..0b8bb25 100644 --- a/catalog/packages/tests/test_vnfpackage.py +++ b/catalog/packages/tests/test_vnfpackage.py @@ -14,15 +14,23 @@ import json import mock -from rest_framework.test import APIClient +import os +import catalog.pub.utils.timeutil +from requests.auth import HTTPBasicAuth + from django.test import TestCase from rest_framework import status +from rest_framework.test import APIClient + from catalog.packages.biz.sdc_vnf_package import NfDistributeThread, NfPkgDeleteThread from catalog.pub.database.models import JobStatusModel, JobModel from catalog.pub.database.models import VnfPackageModel from catalog.pub.msapi import sdc from catalog.pub.utils import restcall, toscaparser from .const import vnfd_data +from catalog.pub.config.config import CATALOG_ROOT_PATH +from catalog.packages import const +from catalog.pub.config import config as pub_config class TestNfPackage(TestCase): @@ -81,14 +89,14 @@ class TestNfPackage(TestCase): "uuid": "1", "toscaModelURL": "https://127.0.0.1:1234/sdc/v1/hss.csar" }]), '200'] - VnfPackageModel(vnfPackageId="2", vnfdId="zte-hss-1.0").save() + VnfPackageModel(vnfPackageId="2", vnfdId="00342b18-a5c7-11e8-998c-bf1755941f12").save() NfDistributeThread( csar_id="1", vim_ids=["1"], lab_vim_id="", job_id="2" ).run() - self.assert_job_result("2", 255, "VNF package(zte-hss-1.0) already exists.") + self.assert_job_result("2", 255, "VNF package(00342b18-a5c7-11e8-998c-bf1755941f12) already exists.") @mock.patch.object(restcall, 'call_req') @mock.patch.object(sdc, 'download_artifacts') @@ -240,7 +248,7 @@ class TestNfPackage(TestCase): "inputs": [] } resp = self.client.post( - "/api/catalog/v1/parservnfd", + "/api/parser/v1/parservnfd", req_data, format='json' ) @@ -250,9 +258,124 @@ class TestNfPackage(TestCase): def test_vnfd_parse_when_csar_not_exist(self): req_data = {"csarId": "1", "inputs": []} resp = self.client.post( - "/api/catalog/v1/parservnfd", + "/api/parser/v1/parservnfd", req_data, format='json' ) self.assertEqual(resp.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) self.assertEqual(resp.data, {"error": "VNF CSAR(1) does not exist."}) + + @mock.patch("requests.post") + @mock.patch.object(sdc, 'download_artifacts') + @mock.patch.object(sdc, 'get_artifact') + @mock.patch("requests.get") + @mock.patch("uuid.uuid4") + @mock.patch.object(catalog.pub.utils.timeutil, "now_time") + def test_service_pkg_distribute_and_notify(self, mock_nowtime, mock_uuid, mock_requests_get, mock_get_artifact, + mock_download_artifacts, mock_requests_post): + mock_nowtime.return_value = "2019-02-16 14:41:16" + uuid_subscriptid = "99442b18-a5c7-11e8-998c-bf1755941f13" + uuid_csarid = "1234" + mock_uuid.side_effect = [uuid_subscriptid, "1111", "2222"] + mock_requests_get.return_value.status_code = 204 + vnf_subscription_data = { + "filter": { + "notificationTypes": [ + "VnfPackageOnboardingNotification", + "VnfPackageChangeNotification" + ], + "vnfdId": [ + "b1bb0ce7-2222-4fa7-95ed-4840d70a1177" + ], + "operationalState": ["ENABLED", "DISABLED"] + }, + "callbackUri": "https://so-vnfm-simulator.onap:9093/vnfpkgm/v1/notification", + "authentication": { + "authType": [ + "BASIC" + ], + "paramsBasic": { + "userName": "vnfm", + "password": "password1$" + } + } + } + response = self.client.post( + "/api/vnfpkgm/v1/subscriptions", + data=vnf_subscription_data, + format='json') + self.assertEqual(201, response.status_code) + mock_download_artifacts.return_value = os.path.join(CATALOG_ROOT_PATH, "Sol004VnfCSAR.csar") + mock_get_artifact.return_value = { + "uuid": "c94490a0-f7ef-48be-b3f8-8d8662a37236", + "invariantUUID": "63eaec39-ffbe-411c-a838-448f2c73f7eb", + "name": "Sol004VnfCSAR", + "version": "2.0", + "toscaModelURL": "/sdc/v1/catalog/resources/c94490a0-f7ef-48be-b3f8-8d8662a37236/toscaModel", + "category": "Volte", + "subCategory": "VolteVF", + "resourceType": "VF", + "lifecycleState": "CERTIFIED", + "lastUpdaterUserId": "jh0003" + } + NfDistributeThread(csar_id=uuid_csarid, vim_ids=["1"], lab_vim_id="", job_id="4").on_distribute() + print(VnfPackageModel.objects.all().values()) + expect_onboarding_notification = { + 'id': "1111", + 'notificationType': const.PKG_NOTIFICATION_TYPE.ONBOARDING, + 'timeStamp': "2019-02-16 14:41:16", + 'vnfPkgId': "1234", + 'vnfdId': "b1bb0ce7-2222-4fa7-95ed-4840d70a1177", + '_links': { + 'vnfPackage': { + 'href': 'http://%s:%s/%s/vnf_packages/%s' % (pub_config.MSB_SERVICE_IP, + pub_config.MSB_SERVICE_PORT, + const.PKG_URL_PREFIX, + uuid_csarid)}, + 'subscription': { + 'href': 'http://%s:%s/%s%s' % (pub_config.MSB_SERVICE_IP, + pub_config.MSB_SERVICE_PORT, + const.VNFPKG_SUBSCRIPTION_ROOT_URI, + uuid_subscriptid)} + + }, + "subscriptionId": uuid_subscriptid + } + mock_requests_post.return_value.status_code = 204 + mock_requests_post.assert_called_with(vnf_subscription_data["callbackUri"], data=expect_onboarding_notification, + headers={'Connection': 'close', + 'content-type': 'application/json', + 'accept': 'application/json'}, + auth=HTTPBasicAuth("vnfm", "password1$"), + verify=False) + mock_requests_post.return_value.status_code = 204 + expect_deleted_notification = { + 'id': "2222", + 'notificationType': const.PKG_NOTIFICATION_TYPE.CHANGE, + 'timeStamp': "2019-02-16 14:41:16", + 'vnfPkgId': "1234", + 'vnfdId': "b1bb0ce7-2222-4fa7-95ed-4840d70a1177", + 'changeType': const.PKG_CHANGE_TYPE.PKG_DELETE, + 'operationalState': None, + "subscriptionId": uuid_subscriptid, + '_links': { + 'subscription': { + 'href': 'http://%s:%s/%s%s' % (pub_config.MSB_SERVICE_IP, + pub_config.MSB_SERVICE_PORT, + const.VNFPKG_SUBSCRIPTION_ROOT_URI, + uuid_subscriptid)}, + 'vnfPackage': { + 'href': 'http://%s:%s/%s/vnf_packages/%s' % (pub_config.MSB_SERVICE_IP, + pub_config.MSB_SERVICE_PORT, + const.PKG_URL_PREFIX, + uuid_csarid) + } + } + } + NfPkgDeleteThread(csar_id=uuid_csarid, job_id="5").delete_csar() + mock_requests_post.assert_called_with(vnf_subscription_data["callbackUri"], data=expect_deleted_notification, + headers={'Connection': 'close', + 'content-type': 'application/json', + 'accept': 'application/json'}, + auth=HTTPBasicAuth("vnfm", "password1$"), + verify=False)