fix but for ETSI Catalog Manager needs to trigger a notification after storing ETSI...
[modeling/etsicatalog.git] / catalog / packages / tests / test_vnfpackage.py
index 37697b0..0b8bb25 100644 (file)
 # limitations under the License.
 
 import json
-
 import mock
+import os
+import catalog.pub.utils.timeutil
+from requests.auth import HTTPBasicAuth
+
 from django.test import TestCase
 from rest_framework import status
 from rest_framework.test import APIClient
@@ -25,6 +28,9 @@ from catalog.pub.database.models import VnfPackageModel
 from catalog.pub.msapi import sdc
 from catalog.pub.utils import restcall, toscaparser
 from .const import vnfd_data
+from catalog.pub.config.config import CATALOG_ROOT_PATH
+from catalog.packages import const
+from catalog.pub.config import config as pub_config
 
 
 class TestNfPackage(TestCase):
@@ -258,3 +264,118 @@ class TestNfPackage(TestCase):
         )
         self.assertEqual(resp.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
         self.assertEqual(resp.data, {"error": "VNF CSAR(1) does not exist."})
+
+    @mock.patch("requests.post")
+    @mock.patch.object(sdc, 'download_artifacts')
+    @mock.patch.object(sdc, 'get_artifact')
+    @mock.patch("requests.get")
+    @mock.patch("uuid.uuid4")
+    @mock.patch.object(catalog.pub.utils.timeutil, "now_time")
+    def test_service_pkg_distribute_and_notify(self, mock_nowtime, mock_uuid, mock_requests_get, mock_get_artifact,
+                                               mock_download_artifacts, mock_requests_post):
+        mock_nowtime.return_value = "2019-02-16 14:41:16"
+        uuid_subscriptid = "99442b18-a5c7-11e8-998c-bf1755941f13"
+        uuid_csarid = "1234"
+        mock_uuid.side_effect = [uuid_subscriptid, "1111", "2222"]
+        mock_requests_get.return_value.status_code = 204
+        vnf_subscription_data = {
+            "filter": {
+                "notificationTypes": [
+                    "VnfPackageOnboardingNotification",
+                    "VnfPackageChangeNotification"
+                ],
+                "vnfdId": [
+                    "b1bb0ce7-2222-4fa7-95ed-4840d70a1177"
+                ],
+                "operationalState": ["ENABLED", "DISABLED"]
+            },
+            "callbackUri": "https://so-vnfm-simulator.onap:9093/vnfpkgm/v1/notification",
+            "authentication": {
+                "authType": [
+                    "BASIC"
+                ],
+                "paramsBasic": {
+                    "userName": "vnfm",
+                    "password": "password1$"
+                }
+            }
+        }
+        response = self.client.post(
+            "/api/vnfpkgm/v1/subscriptions",
+            data=vnf_subscription_data,
+            format='json')
+        self.assertEqual(201, response.status_code)
+        mock_download_artifacts.return_value = os.path.join(CATALOG_ROOT_PATH, "Sol004VnfCSAR.csar")
+        mock_get_artifact.return_value = {
+            "uuid": "c94490a0-f7ef-48be-b3f8-8d8662a37236",
+            "invariantUUID": "63eaec39-ffbe-411c-a838-448f2c73f7eb",
+            "name": "Sol004VnfCSAR",
+            "version": "2.0",
+            "toscaModelURL": "/sdc/v1/catalog/resources/c94490a0-f7ef-48be-b3f8-8d8662a37236/toscaModel",
+            "category": "Volte",
+            "subCategory": "VolteVF",
+            "resourceType": "VF",
+            "lifecycleState": "CERTIFIED",
+            "lastUpdaterUserId": "jh0003"
+        }
+        NfDistributeThread(csar_id=uuid_csarid, vim_ids=["1"], lab_vim_id="", job_id="4").on_distribute()
+        print(VnfPackageModel.objects.all().values())
+        expect_onboarding_notification = {
+            'id': "1111",
+            'notificationType': const.PKG_NOTIFICATION_TYPE.ONBOARDING,
+            'timeStamp': "2019-02-16 14:41:16",
+            'vnfPkgId': "1234",
+            'vnfdId': "b1bb0ce7-2222-4fa7-95ed-4840d70a1177",
+            '_links': {
+                'vnfPackage': {
+                    'href': 'http://%s:%s/%s/vnf_packages/%s' % (pub_config.MSB_SERVICE_IP,
+                                                                 pub_config.MSB_SERVICE_PORT,
+                                                                 const.PKG_URL_PREFIX,
+                                                                 uuid_csarid)},
+                    'subscription': {
+                        'href': 'http://%s:%s/%s%s' % (pub_config.MSB_SERVICE_IP,
+                                                       pub_config.MSB_SERVICE_PORT,
+                                                       const.VNFPKG_SUBSCRIPTION_ROOT_URI,
+                                                       uuid_subscriptid)}
+
+            },
+            "subscriptionId": uuid_subscriptid
+        }
+        mock_requests_post.return_value.status_code = 204
+        mock_requests_post.assert_called_with(vnf_subscription_data["callbackUri"], data=expect_onboarding_notification,
+                                              headers={'Connection': 'close',
+                                                       'content-type': 'application/json',
+                                                       'accept': 'application/json'},
+                                              auth=HTTPBasicAuth("vnfm", "password1$"),
+                                              verify=False)
+        mock_requests_post.return_value.status_code = 204
+        expect_deleted_notification = {
+            'id': "2222",
+            'notificationType': const.PKG_NOTIFICATION_TYPE.CHANGE,
+            'timeStamp': "2019-02-16 14:41:16",
+            'vnfPkgId': "1234",
+            'vnfdId': "b1bb0ce7-2222-4fa7-95ed-4840d70a1177",
+            'changeType': const.PKG_CHANGE_TYPE.PKG_DELETE,
+            'operationalState': None,
+            "subscriptionId": uuid_subscriptid,
+            '_links': {
+                'subscription': {
+                    'href': 'http://%s:%s/%s%s' % (pub_config.MSB_SERVICE_IP,
+                                                   pub_config.MSB_SERVICE_PORT,
+                                                   const.VNFPKG_SUBSCRIPTION_ROOT_URI,
+                                                   uuid_subscriptid)},
+                'vnfPackage': {
+                    'href': 'http://%s:%s/%s/vnf_packages/%s' % (pub_config.MSB_SERVICE_IP,
+                                                                 pub_config.MSB_SERVICE_PORT,
+                                                                 const.PKG_URL_PREFIX,
+                                                                 uuid_csarid)
+                }
+            }
+        }
+        NfPkgDeleteThread(csar_id=uuid_csarid, job_id="5").delete_csar()
+        mock_requests_post.assert_called_with(vnf_subscription_data["callbackUri"], data=expect_deleted_notification,
+                                              headers={'Connection': 'close',
+                                                       'content-type': 'application/json',
+                                                       'accept': 'application/json'},
+                                              auth=HTTPBasicAuth("vnfm", "password1$"),
+                                              verify=False)