import mock
from django.test import TestCase
+from requests.auth import HTTPBasicAuth
from rest_framework import status
from rest_framework.test import APIClient
-from requests.auth import HTTPBasicAuth
import catalog.pub.utils.timeutil
from catalog.packages import const
from catalog.packages.biz.notificationsutil import PkgNotifications
from catalog.packages.biz.vnf_pkg_subscription import QuerySubscription, TerminateSubscription
-from catalog.pub.config import config as pub_config
+from catalog.packages.tests.const import vnfd_data
from catalog.pub.config.config import CATALOG_ROOT_PATH
from catalog.pub.database.models import VnfPkgSubscriptionModel, VnfPackageModel
from catalog.pub.exceptions import SubscriptionDoesNotExistsException
from catalog.pub.utils import toscaparser
-from .const import vnf_subscription_data, vnfd_data
+from .const import vnf_subscription_data
class TestNfPackageSubscription(TestCase):
response.data["callbackUri"]
)
self.assertEqual(temp_uuid, response.data["id"])
- temp_uuid = "00442b18-a5c7-11e8-998c-bf1755941f12"
+ temp2_uuid = "00442b18-a5c7-11e8-998c-bf1755941f12"
mock_requests.return_value.status_code = 204
mock_requests.get.status_code = 204
- mock_uuid4.return_value = temp_uuid
+ mock_uuid4.return_value = temp2_uuid
response = self.client.post(
"/api/vnfpkgm/v1/subscriptions",
data=self.vnf_subscription_data,
format='json'
)
self.assertEqual(303, response.status_code)
+ redirect_addr = "/%s" % (os.path.join(const.VNFPKG_SUBSCRIPTION_ROOT_URI, temp_uuid))
+ self.assertEqual(redirect_addr, response["Location"])
+
+ @mock.patch("requests.get")
+ def test_callbackuri_same_subscriptions(self, mock_requests):
+ mock_requests.return_value.status_code = 204
+ mock_requests.get.status_code = 204
+ response = self.client.post(
+ "/api/vnfpkgm/v1/subscriptions",
+ data=self.vnf_subscription_data,
+ format='json'
+ )
+ self.assertEqual(201, response.status_code)
+ self.assertEqual(
+ self.vnf_subscription_data["callbackUri"],
+ response.data["callbackUri"]
+ )
+ newsubs = self.vnf_subscription_data
+ newsubs["filter"]["vnfdId"] = ["ssss"]
+ response = self.client.post(
+ "/api/vnfpkgm/v1/subscriptions",
+ data=self.vnf_subscription_data,
+ format='json'
+ )
+ self.assertEqual(201, response.status_code)
+ self.assertEqual(
+ newsubs["callbackUri"],
+ response.data["callbackUri"]
+ )
@mock.patch("requests.get")
@mock.patch.object(uuid, 'uuid4')
self.assertEqual(temp_uuid, response.data["id"])
@mock.patch("requests.get")
- @mock.patch.object(uuid, 'uuid4')
- def test_get_subscription_with_id_not_exists(self, mock_uuid4, mock_requests):
- temp_uuid = "99442b18-a5c7-11e8-998c-bf1755941f13"
- dummy_uuid = str(uuid.uuid4())
+ def test_get_subscription_with_id_not_exists(self, mock_requests):
mock_requests.return_value.status_code = 204
mock_requests.get.status_code = 204
- mock_uuid4.return_value = temp_uuid
self.client.post(
"/api/vnfpkgm/v1/subscriptions",
data=self.vnf_subscription_data,
format='json'
)
response = self.client.get(
- "/api/vnfpkgm/v1/subscriptions/%s" % dummy_uuid,
+ "/api/vnfpkgm/v1/subscriptions/111",
format='json'
)
self.assertEqual(404, response.status_code)
response = self.client.delete("/api/vnfpkgm/v1/subscriptions/%s" % temp_uuid)
self.assertEqual(204, response.status_code)
- @mock.patch("requests.get")
- @mock.patch.object(uuid, 'uuid4')
- def test_delete_subscription_with_id_not_exists(self, mock_uuid4, mock_requests):
+ def test_delete_subscription_with_id_not_exists(self):
dummy_uuid = str(uuid.uuid4())
response = self.client.delete("/api/vnfpkgm/v1/subscriptions/%s" % dummy_uuid)
self.assertEqual(404, response.status_code)
'timeStamp': "2019-02-16 14:41:16",
'vnfPkgId': uuid_vnfPackageId,
'vnfdId': uuid_vnfdid,
- "subscriptionId": uuid_subscriptid,
'_links': {
- 'subscription': {
- 'href': 'http://%s:%s/%s%s' % (pub_config.MSB_SERVICE_IP,
- pub_config.MSB_SERVICE_PORT,
- const.VNFPKG_SUBSCRIPTION_ROOT_URI,
- uuid_subscriptid)},
'vnfPackage': {
- 'href': 'http://%s:%s/%s/vnf_packages/%s' % (pub_config.MSB_SERVICE_IP,
- pub_config.MSB_SERVICE_PORT,
- const.PKG_URL_PREFIX,
- uuid_vnfPackageId)
- }
- }
+ 'href': '/%s/vnf_packages/%s' % (const.PKG_URL_PREFIX, uuid_vnfPackageId)
+ },
+ 'subscription': {
+ 'href': '/%s%s' % (const.VNFPKG_SUBSCRIPTION_ROOT_URI, uuid_subscriptid)}
+ },
+ "subscriptionId": uuid_subscriptid
}
- mock_requests_post.assert_called_with(vnf_subscription_data["callbackUri"], data=expect_notification,
- headers={'Connection': 'close'}, auth=HTTPBasicAuth("admin", "pwd1234"))
+ mock_requests_post.assert_called_with(vnf_subscription_data["callbackUri"],
+ data=json.dumps(expect_notification),
+ headers={'Connection': 'close',
+ 'content-type': 'application/json',
+ 'accept': 'application/json'},
+ auth=HTTPBasicAuth("admin", "pwd1234"),
+ verify=False)
def test_service_query_single_subscription_not_found(self):
try:
'timeStamp': "2019-12-16 14:41:16",
'vnfPkgId': "vnfpkgid1",
'vnfdId': "vnfdid1",
- 'changeType': const.PKG_CHANGE_TYPE.OP_STATE_CHANGE,
- 'operationalState': None,
- "subscriptionId": "1",
'_links': {
- 'subscription': {
- 'href': 'http://%s:%s/%s%s' % (pub_config.MSB_SERVICE_IP,
- pub_config.MSB_SERVICE_PORT,
- const.VNFPKG_SUBSCRIPTION_ROOT_URI,
- "1")},
'vnfPackage': {
- 'href': 'http://%s:%s/%s/vnf_packages/%s' % (pub_config.MSB_SERVICE_IP,
- pub_config.MSB_SERVICE_PORT,
- const.PKG_URL_PREFIX,
- "vnfpkgid1")
- }
- }
+ 'href': '/%s/vnf_packages/%s' % (const.PKG_URL_PREFIX, "vnfpkgid1")
+ },
+ 'subscription': {
+ 'href': '/%s%s' % (const.VNFPKG_SUBSCRIPTION_ROOT_URI, "1")}
+ },
+ 'changeType': const.PKG_CHANGE_TYPE.OP_STATE_CHANGE,
+ 'operationalState': None,
+ "subscriptionId": "1"
}
- mock_requests_post.assert_called_with(expect_callbackuri, data=expect_notification,
- headers={'Connection': 'close'})
+ mock_requests_post.assert_called_with(expect_callbackuri, data=json.dumps(expect_notification),
+ headers={'Connection': 'close',
+ 'content-type': 'application/json',
+ 'accept': 'application/json'},
+ verify=False)