ADD UT Issue-ID: VFC-1429 Signed-off-by: zhuerlei <zhu.erlei@zte.com.cn>
[vfc/nfvo/lcm.git] / lcm / ns_vnfs / tests / tests.py
index b4c60cf..b490432 100644 (file)
@@ -20,7 +20,7 @@ from django.test import TestCase, Client
 from rest_framework import status
 
 from lcm.pub.database.models import VLInstModel, NfInstModel, JobModel, NSInstModel, VmInstModel, \
-    OOFDataModel, VNFCInstModel, PortInstModel, CPInstModel
+    OOFDataModel, VNFCInstModel, PortInstModel, CPInstModel, SubscriptionModel
 from lcm.pub.exceptions import NSLCMException
 from lcm.pub.utils import restcall
 from lcm.jobs.enum import JOB_MODEL_STATUS, JOB_TYPE, JOB_ACTION, JOB_PROGRESS
@@ -93,10 +93,13 @@ class TestTerminateVnfViews(TestCase):
                                    vimid='{"cloud_owner": "VCPE", "cloud_regionid": "RegionOne"}',
                                    instid=self.nf_inst_id
                                    )
+        SubscriptionModel(vnf_instance_filter=self.nf_inst_id, callback_uri="", links="").save()
 
     def tearDown(self):
         NSInstModel.objects.all().delete()
         NfInstModel.objects.all().delete()
+        VmInstModel.objects.all().delete()
+        SubscriptionModel.objects.all().delete()
 
     @mock.patch.object(TerminateVnfs, "run")
     def test_terminate_vnf_url(self, mock_run):
@@ -109,8 +112,7 @@ class TestTerminateVnfViews(TestCase):
 
     @mock.patch.object(time, "sleep")
     @mock.patch.object(restcall, "call_req")
-    @mock.patch.object(SubscriptionDeletion, "send_subscription_deletion_request")
-    def test_terminate_vnf(self, mock_send_subscription_deletion_request, mock_call_req, mock_sleep):
+    def test_terminate_vnf(self, mock_call_req, mock_sleep):
         job_id = JobUtil.create_job(JOB_TYPE.VNF, JOB_ACTION.TERMINATE, self.nf_inst_id)
         job_info = {
             "jobId": job_id,
@@ -124,7 +126,9 @@ class TestTerminateVnfViews(TestCase):
             "/api/ztevnfmdriver/v1/1/jobs/" + job_id + "?responseId=0":
                 [0, json.JSONEncoder().encode(job_info), "200"],
             "/api/resmgr/v1/vnf/1":
-                [0, json.JSONEncoder().encode({"jobId": job_id}), "200"]
+                [0, json.JSONEncoder().encode({"jobId": job_id}), "200"],
+            "api/gvnfmdriver/v1/1/subscriptions/":
+                [0, json.JSONEncoder().encode({}), "200"]
         }
 
         def side_effect(*args):
@@ -138,6 +142,62 @@ class TestTerminateVnfViews(TestCase):
             self.assertEqual(1, 1)
         self.assertEqual(JobModel.objects.get(jobid=job_id).progress, 100)
 
+    @mock.patch.object(time, "sleep")
+    @mock.patch.object(restcall, "call_req")
+    @mock.patch.object(SubscriptionDeletion, "send_subscription_deletion_request")
+    def test_terminate_vnf_when_no_vnf_uuid(self, mock_send_subscription_deletion_request, mock_call_req, mock_sleep):
+        nf_inst_id = "test_terminate_vnf_when_no_vnf_uuid"
+        NSInstModel(id=nf_inst_id, name="ns_name_2").save()
+        NfInstModel.objects.create(nfinstid=nf_inst_id,
+                                   vnfm_inst_id="2",
+                                   status="active",
+                                   vnfd_model=self.vnfd_model
+                                   )
+        VmInstModel.objects.create(vmid="2",
+                                   vimid='{"cloud_owner": "VCPE", "cloud_regionid": "RegionOne"}',
+                                   instid=nf_inst_id
+                                   )
+        job_id = JobUtil.create_job(JOB_TYPE.VNF, JOB_ACTION.TERMINATE, nf_inst_id)
+        job_info = {
+            "jobId": job_id,
+            "responsedescriptor": {"status": JOB_MODEL_STATUS.FINISHED}
+        }
+        mock_vals = {
+            "/external-system/esr-vnfm-list/esr-vnfm/2?depth=all":
+                [0, json.JSONEncoder().encode(vnfm_info), "200"],
+            "/api/ztevnfmdriver/v1/2/vnfs/None/terminate":
+                [0, json.JSONEncoder().encode({"jobId": job_id}), "200"],
+            "/api/ztevnfmdriver/v1/2/jobs/" + job_id + "?responseId=0":
+                [0, json.JSONEncoder().encode(job_info), "200"],
+            "/api/resmgr/v1/vnf/%s" % nf_inst_id:
+                [0, json.JSONEncoder().encode({"jobId": job_id}), "200"]
+        }
+
+        def side_effect(*args):
+            return mock_vals[args[4]]
+
+        mock_call_req.side_effect = side_effect
+        TerminateVnfs(self.data, nf_inst_id, job_id).run()
+        nfinst = NfInstModel.objects.filter(nfinstid=nf_inst_id)
+        if nfinst:
+            self.assertEqual(1, 0)
+        else:
+            self.assertEqual(1, 1)
+        self.assertEqual(JobModel.objects.get(jobid=job_id).progress, 100)
+
+    @mock.patch.object(time, "sleep")
+    @mock.patch.object(restcall, "call_req")
+    @mock.patch.object(SubscriptionDeletion, "send_subscription_deletion_request")
+    def test_terminate_vnf_when_nf_not_exists(self, mock_send_subscription_deletion_request, mock_call_req, mock_sleep):
+        job_id = JobUtil.create_job(JOB_TYPE.VNF, JOB_ACTION.TERMINATE, self.nf_inst_id)
+        TerminateVnfs(self.data, "nf_not_exists", job_id).run()
+        nfinst = NfInstModel.objects.filter(nfinstid="nf_not_exists")
+        if nfinst:
+            self.assertEqual(1, 0)
+        else:
+            self.assertEqual(1, 1)
+        self.assertEqual(JobModel.objects.get(jobid=job_id).progress, 255)
+
     def test_terminate_vnf_when_vnf_is_dealing(self):
         NfInstModel.objects.filter(nfinstid=self.nf_inst_id).update(status=VNF_STATUS.TERMINATING)
         job_id = JobUtil.create_job(JOB_TYPE.VNF, JOB_ACTION.TERMINATE, self.nf_inst_id)
@@ -553,6 +613,8 @@ class TestPlaceVnfViews(TestCase):
         self.vnf_inst_id = "1234"
         self.vnf_id = "vG"
         self.client = Client()
+        self.url = "/api/nslcm/v1/ns/placevnf"
+        self.data = vnf_place_request
         OOFDataModel.objects.all().delete()
         OOFDataModel.objects.create(
             request_id="1234",
@@ -577,6 +639,7 @@ class TestPlaceVnfViews(TestCase):
             "flavorId": "12345",
             "directive": []
         }]
+        # response = self.client.post(self.url, data=self.data)
         PlaceVnfs(vnf_place_request).extract()
         db_info = OOFDataModel.objects.filter(request_id=vnf_place_request.get("requestId"), transaction_id=vnf_place_request.get("transactionId"))
         self.assertEqual(db_info[0].request_status, "completed")
@@ -1150,7 +1213,7 @@ class TestCreateVnfViews(TestCase):
                                                                            "statusdescription": "creating",
                                                                            "errorcode": "0"}]}}), "200"],
             "api/gvnfmdriver/v1/1/subscriptions":
-                [0, json.JSONEncoder().encode({}), "200"],
+                [0, json.JSONEncoder().encode(subscription_response_data), "200"],
             "/api/resmgr/v1/vnfinfo":
                 [0, json.JSONEncoder().encode(subscription_response_data), "200"],
 
@@ -1292,6 +1355,55 @@ class TestCreateVnfViews(TestCase):
         ret = OOFDataModel.objects.filter(request_id="1234", transaction_id="1234")
         self.assertIsNotNone(ret)
 
+    @mock.patch.object(time, "sleep")
+    @mock.patch.object(restcall, "call_req")
+    def test_create_vnf_thread_sucess_when_failed_to_subscribe_from_vnfm(self, mock_call_req, mock_sleep):
+        mock_sleep.return_value = None
+        nf_inst_id, job_id = create_vnfs.prepare_create_params()
+        mock_vals = {
+            "/api/catalog/v1/vnfpackages/zte_vbras":
+                [0, json.JSONEncoder().encode(nf_package_info), "200"],
+            "/external-system/esr-vnfm-list/esr-vnfm/1?depth=all":
+                [0, json.JSONEncoder().encode(vnfm_info), "200"],
+            "/api/ztevnfmdriver/v1/1/vnfs":
+                [0, json.JSONEncoder().encode({"jobId": self.job_id, "vnfInstanceId": 3}), "200"],
+            "/api/oof/v1/placement":
+                [0, json.JSONEncoder().encode({}), "202"],
+            "/api/resmgr/v1/vnf":
+                [0, json.JSONEncoder().encode({}), "200"],
+            "/api/ztevnfmdriver/v1/1/jobs/" + self.job_id + "?responseId=0":
+                [0, json.JSONEncoder().encode({"jobid": self.job_id,
+                                               "responsedescriptor": {"progress": "100",
+                                                                      "status": JOB_MODEL_STATUS.FINISHED,
+                                                                      "responseid": "3",
+                                                                      "statusdescription": "creating",
+                                                                      "errorcode": "0",
+                                                                      "responsehistorylist": [
+                                                                          {"progress": "0",
+                                                                           "status": JOB_MODEL_STATUS.PROCESSING,
+                                                                           "responseid": "2",
+                                                                           "statusdescription": "creating",
+                                                                           "errorcode": "0"}]}}), "200"],
+            "api/gvnfmdriver/v1/1/subscriptions":
+                [1, json.JSONEncoder().encode(subscription_response_data), "200"],
+            "/api/resmgr/v1/vnfinfo":
+                [0, json.JSONEncoder().encode(subscription_response_data), "200"],
+        }
+
+        def side_effect(*args):
+            return mock_vals[args[4]]
+
+        mock_call_req.side_effect = side_effect
+        data = {
+            "ns_instance_id": ignore_case_get(self.data, "nsInstanceId"),
+            "additional_param_for_ns": ignore_case_get(self.data, "additionalParamForNs"),
+            "additional_param_for_vnf": ignore_case_get(self.data, "additionalParamForVnf"),
+            "vnf_index": ignore_case_get(self.data, "vnfIndex")
+        }
+        CreateVnfs(data, nf_inst_id, job_id).run()
+        self.assertEqual(NfInstModel.objects.get(nfinstid=nf_inst_id).status, VNF_STATUS.ACTIVE)
+        self.assertEqual(JobModel.objects.get(jobid=job_id).progress, JOB_PROGRESS.FINISHED)
+
 
 class TestUpdateVnfsViews(TestCase):
     def setUp(self):
@@ -2182,6 +2294,7 @@ class TestVnfNotifyView(TestCase):
         CPInstModel.objects.all().delete()
 
     def test_handle_vnf_lcm_ooc_notification_when_change_type_is_added(self):
+        # response = self.client.post(self.url, data=self.data)
         HandleVnfLcmOocNotification(self.vnfm_inst_id, self.m_nf_inst_id, self.data).do_biz()
         vnfc_inst = VNFCInstModel.objects.get(vnfcinstanceid="vnfc_instance_id", vduid="vdu_id",
                                               nfinstid=self.nf_inst_id, vmid="resource_id")