X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=catalog%2Fpackages%2Ftests%2Ftest_vnfpackage.py;h=0b8bb25cbb96091a6f3048cc42fe5d16097d54d4;hb=ac2f763bcf8ed41d9f2867a33b29860318854502;hp=37697b05ceff06217b8d93c7ea5f04b8f318970c;hpb=7aadf85f43a98e34958e2efe2e62136a81c16db5;p=modeling%2Fetsicatalog.git diff --git a/catalog/packages/tests/test_vnfpackage.py b/catalog/packages/tests/test_vnfpackage.py index 37697b0..0b8bb25 100644 --- a/catalog/packages/tests/test_vnfpackage.py +++ b/catalog/packages/tests/test_vnfpackage.py @@ -13,8 +13,11 @@ # limitations under the License. import json - import mock +import os +import catalog.pub.utils.timeutil +from requests.auth import HTTPBasicAuth + from django.test import TestCase from rest_framework import status from rest_framework.test import APIClient @@ -25,6 +28,9 @@ from catalog.pub.database.models import VnfPackageModel from catalog.pub.msapi import sdc from catalog.pub.utils import restcall, toscaparser from .const import vnfd_data +from catalog.pub.config.config import CATALOG_ROOT_PATH +from catalog.packages import const +from catalog.pub.config import config as pub_config class TestNfPackage(TestCase): @@ -258,3 +264,118 @@ class TestNfPackage(TestCase): ) self.assertEqual(resp.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) self.assertEqual(resp.data, {"error": "VNF CSAR(1) does not exist."}) + + @mock.patch("requests.post") + @mock.patch.object(sdc, 'download_artifacts') + @mock.patch.object(sdc, 'get_artifact') + @mock.patch("requests.get") + @mock.patch("uuid.uuid4") + @mock.patch.object(catalog.pub.utils.timeutil, "now_time") + def test_service_pkg_distribute_and_notify(self, mock_nowtime, mock_uuid, mock_requests_get, mock_get_artifact, + mock_download_artifacts, mock_requests_post): + mock_nowtime.return_value = "2019-02-16 14:41:16" + uuid_subscriptid = "99442b18-a5c7-11e8-998c-bf1755941f13" + uuid_csarid = "1234" + mock_uuid.side_effect = [uuid_subscriptid, "1111", "2222"] + mock_requests_get.return_value.status_code = 204 + vnf_subscription_data = { + "filter": { + "notificationTypes": [ + "VnfPackageOnboardingNotification", + "VnfPackageChangeNotification" + ], + "vnfdId": [ + "b1bb0ce7-2222-4fa7-95ed-4840d70a1177" + ], + "operationalState": ["ENABLED", "DISABLED"] + }, + "callbackUri": "https://so-vnfm-simulator.onap:9093/vnfpkgm/v1/notification", + "authentication": { + "authType": [ + "BASIC" + ], + "paramsBasic": { + "userName": "vnfm", + "password": "password1$" + } + } + } + response = self.client.post( + "/api/vnfpkgm/v1/subscriptions", + data=vnf_subscription_data, + format='json') + self.assertEqual(201, response.status_code) + mock_download_artifacts.return_value = os.path.join(CATALOG_ROOT_PATH, "Sol004VnfCSAR.csar") + mock_get_artifact.return_value = { + "uuid": "c94490a0-f7ef-48be-b3f8-8d8662a37236", + "invariantUUID": "63eaec39-ffbe-411c-a838-448f2c73f7eb", + "name": "Sol004VnfCSAR", + "version": "2.0", + "toscaModelURL": "/sdc/v1/catalog/resources/c94490a0-f7ef-48be-b3f8-8d8662a37236/toscaModel", + "category": "Volte", + "subCategory": "VolteVF", + "resourceType": "VF", + "lifecycleState": "CERTIFIED", + "lastUpdaterUserId": "jh0003" + } + NfDistributeThread(csar_id=uuid_csarid, vim_ids=["1"], lab_vim_id="", job_id="4").on_distribute() + print(VnfPackageModel.objects.all().values()) + expect_onboarding_notification = { + 'id': "1111", + 'notificationType': const.PKG_NOTIFICATION_TYPE.ONBOARDING, + 'timeStamp': "2019-02-16 14:41:16", + 'vnfPkgId': "1234", + 'vnfdId': "b1bb0ce7-2222-4fa7-95ed-4840d70a1177", + '_links': { + 'vnfPackage': { + 'href': 'http://%s:%s/%s/vnf_packages/%s' % (pub_config.MSB_SERVICE_IP, + pub_config.MSB_SERVICE_PORT, + const.PKG_URL_PREFIX, + uuid_csarid)}, + 'subscription': { + 'href': 'http://%s:%s/%s%s' % (pub_config.MSB_SERVICE_IP, + pub_config.MSB_SERVICE_PORT, + const.VNFPKG_SUBSCRIPTION_ROOT_URI, + uuid_subscriptid)} + + }, + "subscriptionId": uuid_subscriptid + } + mock_requests_post.return_value.status_code = 204 + mock_requests_post.assert_called_with(vnf_subscription_data["callbackUri"], data=expect_onboarding_notification, + headers={'Connection': 'close', + 'content-type': 'application/json', + 'accept': 'application/json'}, + auth=HTTPBasicAuth("vnfm", "password1$"), + verify=False) + mock_requests_post.return_value.status_code = 204 + expect_deleted_notification = { + 'id': "2222", + 'notificationType': const.PKG_NOTIFICATION_TYPE.CHANGE, + 'timeStamp': "2019-02-16 14:41:16", + 'vnfPkgId': "1234", + 'vnfdId': "b1bb0ce7-2222-4fa7-95ed-4840d70a1177", + 'changeType': const.PKG_CHANGE_TYPE.PKG_DELETE, + 'operationalState': None, + "subscriptionId": uuid_subscriptid, + '_links': { + 'subscription': { + 'href': 'http://%s:%s/%s%s' % (pub_config.MSB_SERVICE_IP, + pub_config.MSB_SERVICE_PORT, + const.VNFPKG_SUBSCRIPTION_ROOT_URI, + uuid_subscriptid)}, + 'vnfPackage': { + 'href': 'http://%s:%s/%s/vnf_packages/%s' % (pub_config.MSB_SERVICE_IP, + pub_config.MSB_SERVICE_PORT, + const.PKG_URL_PREFIX, + uuid_csarid) + } + } + } + NfPkgDeleteThread(csar_id=uuid_csarid, job_id="5").delete_csar() + mock_requests_post.assert_called_with(vnf_subscription_data["callbackUri"], data=expect_deleted_notification, + headers={'Connection': 'close', + 'content-type': 'application/json', + 'accept': 'application/json'}, + auth=HTTPBasicAuth("vnfm", "password1$"), + verify=False)