SOL003 API ALign
[vfc/gvnfm/vnflcm.git] / lcm / lcm / nf / tests / test_instantiate_vnf.py
diff --git a/lcm/lcm/nf/tests/test_instantiate_vnf.py b/lcm/lcm/nf/tests/test_instantiate_vnf.py
new file mode 100644 (file)
index 0000000..7cf2646
--- /dev/null
@@ -0,0 +1,166 @@
+# Copyright 2017 ZTE Corporation.\r
+#\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+#         http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+\r
+import json\r
+import uuid\r
+\r
+import mock\r
+from django.test import TestCase\r
+from rest_framework import status\r
+from rest_framework.test import APIClient\r
+\r
+from lcm.nf.biz.instantiate_vnf import InstantiateVnf\r
+from lcm.nf.const import c1_data_get_tenant_id, c4_data_create_network, c2_data_create_volume, \\r
+    c5_data_create_subnet, c3_data_get_volume, c6_data_create_port, c7_data_create_flavor, c8_data_list_image, \\r
+    c9_data_create_vm, c10_data_get_vm, inst_req_data, vnfpackage_info\r
+from lcm.pub.database.models import NfInstModel, JobStatusModel\r
+from lcm.pub.utils import restcall\r
+from lcm.pub.utils.jobutil import JobUtil\r
+from lcm.pub.utils.timeutil import now_time\r
+from lcm.pub.vimapi import api\r
+\r
+\r
+class TestNFInstantiate(TestCase):\r
+    def setUp(self):\r
+        self.client = APIClient()\r
+        self.grant_result = {\r
+            "vimid": 'vimid_1',\r
+            "tenant": 'tenantname_1'\r
+        }\r
+\r
+    def tearDown(self):\r
+        pass\r
+\r
+    def assert_job_result(self, job_id, job_progress, job_detail):\r
+        jobs = JobStatusModel.objects.filter(jobid=job_id,\r
+                                             progress=job_progress,\r
+                                             descp=job_detail)\r
+        self.assertEqual(1, len(jobs))\r
+\r
+    @mock.patch.object(InstantiateVnf, 'run')\r
+    def test_instantiate_vnf(self, mock_run):\r
+        NfInstModel(nfinstid='12', nf_name='VNF1').save()\r
+        mock_run.re.return_value = None\r
+        response = self.client.post("/api/vnflcm/v1/vnf_instances/12/instantiate", data=inst_req_data, format='json')\r
+        self.failUnlessEqual(status.HTTP_202_ACCEPTED, response.status_code)\r
+\r
+    def test_instantiate_vnf_when_inst_id_not_exist(self):\r
+        self.nf_inst_id = str(uuid.uuid4())\r
+        self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
+        JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
+        jobs = JobStatusModel.objects.filter(jobid=self.job_id,\r
+                                             progress=0,\r
+                                             descp="INST_VNF_READY")\r
+        self.assertEqual(1, len(jobs))\r
+        data = inst_req_data\r
+        InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
+        self.assert_job_result(self.job_id, 255, "VNF nf_inst_id is not exist.")\r
+\r
+    @mock.patch.object(restcall, 'call_req')\r
+    def test_instantiate_vnf_when_get_packageinfo_by_csarid_failed(self, mock_call_req):\r
+        NfInstModel.objects.create(nfinstid='1111',\r
+                                   nf_name='vFW_01',\r
+                                   package_id='222',\r
+                                   version='',\r
+                                   vendor='',\r
+                                   netype='',\r
+                                   vnfd_model='',\r
+                                   status='NOT_INSTANTIATED',\r
+                                   nf_desc='vFW in Nanjing TIC Edge',\r
+                                   vnfdid='111',\r
+                                   create_time=now_time())\r
+        r1_get_vnfpackage_by_vnfdid = [1, json.JSONEncoder().encode(vnfpackage_info), '200']\r
+        mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid]\r
+        self.nf_inst_id = '1111'\r
+        self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
+        JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
+        data = inst_req_data\r
+        InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
+        self.assert_job_result(self.job_id, 255, "Failed to query vnf CSAR(111) from catalog.")\r
+\r
+    @mock.patch.object(restcall, 'call_req')\r
+    def test_instantiate_vnf_when_applay_grant_failed(self, mock_call_req):\r
+        NfInstModel.objects.create(nfinstid='1111',\r
+                                   nf_name='vFW_01',\r
+                                   package_id='222',\r
+                                   version='',\r
+                                   vendor='',\r
+                                   netype='',\r
+                                   vnfd_model='',\r
+                                   status='NOT_INSTANTIATED',\r
+                                   nf_desc='vFW in Nanjing TIC Edge',\r
+                                   vnfdid='111',\r
+                                   create_time=now_time())\r
+        r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']\r
+        r2_apply_grant_result = [1, json.JSONEncoder().encode(self.grant_result), '200']\r
+        mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result]\r
+        self.nf_inst_id = '1111'\r
+        self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
+        JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
+        data = inst_req_data\r
+        InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
+        self.assert_job_result(self.job_id, 255, "Nf instancing apply grant exception")\r
+\r
+    @mock.patch.object(restcall, 'call_req')\r
+    @mock.patch.object(api, 'call')\r
+    def test_instantiate_vnf_when_unexpected_exception(self, mock_call, mock_call_req):\r
+        NfInstModel.objects.create(nfinstid='1111',\r
+                                   nf_name='vFW_01',\r
+                                   package_id='222',\r
+                                   version='',\r
+                                   vendor='',\r
+                                   netype='',\r
+                                   vnfd_model='',\r
+                                   status='NOT_INSTANTIATED',\r
+                                   nf_desc='vFW in Nanjing TIC Edge',\r
+                                   vnfdid='111',\r
+                                   create_time=now_time())\r
+        r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']\r
+        r2_apply_grant_result = [0, json.JSONEncoder().encode(self.grant_result), '200']\r
+        mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result]\r
+        mock_call.side_effect = [c1_data_get_tenant_id, c2_data_create_volume, c3_data_get_volume]\r
+        self.nf_inst_id = '1111'\r
+        self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
+        JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
+        data = inst_req_data\r
+        InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
+        self.assert_job_result(self.job_id, 255, "unexpected exception")\r
+\r
+    @mock.patch.object(restcall, 'call_req')\r
+    @mock.patch.object(api, 'call')\r
+    def test_instantiate_vnf_success(self, mock_call, mock_call_req):\r
+        NfInstModel.objects.create(nfinstid='1111',\r
+                                   nf_name='vFW_01',\r
+                                   package_id='222',\r
+                                   version='',\r
+                                   vendor='',\r
+                                   netype='',\r
+                                   vnfd_model='',\r
+                                   status='NOT_INSTANTIATED',\r
+                                   nf_desc='vFW in Nanjing TIC Edge',\r
+                                   vnfdid='111',\r
+                                   create_time=now_time())\r
+        r1_get_vnfpackage_by_vnfdid = [0, json.JSONEncoder().encode(vnfpackage_info), '200']\r
+        r2_apply_grant_result = [0, json.JSONEncoder().encode(self.grant_result), '200']\r
+        r3_lcm_notify_result = [0, json.JSONEncoder().encode(''), '200']\r
+        mock_call_req.side_effect = [r1_get_vnfpackage_by_vnfdid, r2_apply_grant_result, r3_lcm_notify_result]\r
+        mock_call.side_effect = [c1_data_get_tenant_id, c2_data_create_volume, c3_data_get_volume,\r
+                                 c4_data_create_network, c5_data_create_subnet, c6_data_create_port,\r
+                                 c7_data_create_flavor, c8_data_list_image, c9_data_create_vm, c10_data_get_vm]\r
+        self.nf_inst_id = '1111'\r
+        self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
+        JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
+        data = inst_req_data\r
+        InstantiateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
+        self.assert_job_result(self.job_id, 100, "Instantiate Vnf success.")\r