--- /dev/null
+# Copyright 2017 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+import mock
+
+from django.test import TestCase
+from lcm.pub.database.models import NSInstModel, ServiceBaseInfoModel
+from lcm.pub.utils import restcall
+from rest_framework import status
+from rest_framework.test import APIClient
+from lcm.ns.tests import NSD_MODEL_DICT_INST_NS_POST_DEAL_VIEW
+
+
+class NSInstPostDealView(TestCase):
+ def setUp(self):
+ self.client = APIClient()
+ self.url = "/api/nslcm/v1/ns/test_ns_instance_id/postdeal"
+ self.data = {"status": "true"}
+ NSInstModel(id="test_ns_instance_id", name="test_ns_instance_name", nsd_id="test_nsd_id",
+ nsd_invariant_id="test_nsd_invariant_id",
+ nsd_model=json.dumps(NSD_MODEL_DICT_INST_NS_POST_DEAL_VIEW)).save()
+ ServiceBaseInfoModel(service_id="test_ns_instance_id", service_name="service_name", service_type="service_type",
+ active_status="active_status", status="status", creator="creator", create_time="0").save()
+
+ def tearDown(self):
+ NSInstModel.objects.all().delete()
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_inst_ns_post_deal_view(self, mock_call_req):
+ ret = {"status": 1}
+ mock_vals = {
+ "api/polengine/v1/policyinfo":
+ [0, json.JSONEncoder().encode(ret), "200"],
+ }
+
+ def side_effect(*args):
+ return mock_vals[args[4]]
+
+ mock_call_req.side_effect = side_effect
+
+ response = self.client.post(self.url, data=self.data, format='json')
+ self.assertEqual(status.HTTP_202_ACCEPTED, response.status_code)
from rest_framework.test import APIClient
from lcm.ns.biz.ns_instant import BuildInWorkflowThread
from lcm.ns.biz.ns_instant import InstantNSService
-from lcm.pub.database.models import NSInstModel
+from lcm.pub.database.models import NSInstModel, WFPlanModel
from lcm.pub.utils import restcall
from lcm.pub.config import config
from lcm.ns.tests import NSD_MODEL_DICT, NSD_MODEL_WITH_PNF_DICT, VNFM_LIST_IN_AAI_DICT, VNFM_IN_AAI_DICT, JOB_DICT, INSTANTIATE_NS_DICT, INSTANTIATE_NS_WITH_PNF_DICT
self._mock_get_auto_id()
def tearDown(self):
- pass
+ NSInstModel.objects.all().delete()
+ WFPlanModel.objects.all().delete()
def _mock_get_auto_id(self):
fake_auto_id = mock.Mock()
ack = InstantNSService(1, INSTANTIATE_NS_WITH_PNF_DICT).do_biz()
self.assertEqual(ack['status'], status.HTTP_200_OK)
+ @mock.patch('lcm.ns.biz.ns_instantiate_flow.post_deal')
+ @mock.patch.object(restcall, 'call_req')
+ @mock.patch('lcm.ns.biz.ns_instantiate_flow.update_job')
+ @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps(NSD_MODEL_WITH_PNF_DICT)))
+ @mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))
+ def test_ns_instantiate_with_pnf_when_wso2(self, mock_updata_job, mock_call_req, mock_post_deal):
+ config.WORKFLOW_OPTION = "wso2"
+ NSInstModel(id="1", name="test_ns", nspackage_id="1", status="created", nsd_id="nsd_id").save()
+ stpls = [{"id": "nsd_id", "csarId": "csar_id", "templateName": "template_name",
+ "serviceTemplateId": "service_template_id"}]
+ items = [{"name": ["init"], "processId": "process_id"}]
+ ret = {"status": 1}
+ mock_vals = {
+ "/api/catalog/v1/servicetemplates":
+ [0, json.JSONEncoder().encode(stpls), "200"],
+ "/api/catalog/v1/servicetemplates/service_template_id/operations":
+ [0, json.JSONEncoder().encode(items), "200"],
+ "/api/wso2bpel/v1/process/instance":
+ [0, json.JSONEncoder().encode(ret), "200"],
+ }
+
+ def side_effect(*args):
+ return mock_vals[args[4]]
+
+ mock_call_req.side_effect = side_effect
+ ack = InstantNSService(1, INSTANTIATE_NS_WITH_PNF_DICT).do_biz()
+ self.assertEqual(ack['status'], status.HTTP_200_OK)
+
+ @mock.patch('lcm.ns.biz.ns_instantiate_flow.post_deal')
+ @mock.patch.object(restcall, 'call_req')
+ @mock.patch('lcm.ns.biz.ns_instantiate_flow.update_job')
+ @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps(NSD_MODEL_WITH_PNF_DICT)))
+ @mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))
+ def test_ns_instantiate_with_pnf_when_activiti(self, mock_updata_job, mock_call_req, mock_post_deal):
+ config.WORKFLOW_OPTION = "activiti"
+ NSInstModel(id="1", name="test_ns", nspackage_id="1", status="created", nsd_id="nsd_id").save()
+ WFPlanModel(deployed_id="1", process_id="process_id", message="message", status=1, plan_name="plan_name").save()
+ ret = {"status": 1}
+ mock_vals = {
+ "api/workflow/v1/process/instance":
+ [0, json.JSONEncoder().encode(ret), "200"],
+ }
+
+ def side_effect(*args):
+ return mock_vals[args[4]]
+
+ mock_call_req.side_effect = side_effect
+ ack = InstantNSService(1, INSTANTIATE_NS_WITH_PNF_DICT).do_biz()
+ self.assertEqual(ack['status'], status.HTTP_200_OK)
+
@mock.patch.object(restcall, 'call_req')
@mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps(NSD_MODEL_WITH_PNF_DICT)))
@mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))
from rest_framework import status
from lcm.pub.database.models import VLInstModel, NfInstModel, JobModel, NSInstModel, VmInstModel, \
- OOFDataModel, VNFCInstModel, PortInstModel, CPInstModel
+ OOFDataModel, VNFCInstModel, PortInstModel, CPInstModel, SubscriptionModel
from lcm.pub.exceptions import NSLCMException
from lcm.pub.utils import restcall
from lcm.jobs.enum import JOB_MODEL_STATUS, JOB_TYPE, JOB_ACTION, JOB_PROGRESS
vimid='{"cloud_owner": "VCPE", "cloud_regionid": "RegionOne"}',
instid=self.nf_inst_id
)
+ SubscriptionModel(vnf_instance_filter=self.nf_inst_id, callback_uri="", links="").save()
def tearDown(self):
NSInstModel.objects.all().delete()
NfInstModel.objects.all().delete()
+ VmInstModel.objects.all().delete()
+ SubscriptionModel.objects.all().delete()
@mock.patch.object(TerminateVnfs, "run")
def test_terminate_vnf_url(self, mock_run):
@mock.patch.object(time, "sleep")
@mock.patch.object(restcall, "call_req")
- @mock.patch.object(SubscriptionDeletion, "send_subscription_deletion_request")
- def test_terminate_vnf(self, mock_send_subscription_deletion_request, mock_call_req, mock_sleep):
+ def test_terminate_vnf(self, mock_call_req, mock_sleep):
job_id = JobUtil.create_job(JOB_TYPE.VNF, JOB_ACTION.TERMINATE, self.nf_inst_id)
job_info = {
"jobId": job_id,
"/api/ztevnfmdriver/v1/1/jobs/" + job_id + "?responseId=0":
[0, json.JSONEncoder().encode(job_info), "200"],
"/api/resmgr/v1/vnf/1":
- [0, json.JSONEncoder().encode({"jobId": job_id}), "200"]
+ [0, json.JSONEncoder().encode({"jobId": job_id}), "200"],
+ "api/gvnfmdriver/v1/1/subscriptions/":
+ [0, json.JSONEncoder().encode({}), "200"]
}
def side_effect(*args):
self.assertEqual(1, 1)
self.assertEqual(JobModel.objects.get(jobid=job_id).progress, 100)
+ @mock.patch.object(time, "sleep")
+ @mock.patch.object(restcall, "call_req")
+ @mock.patch.object(SubscriptionDeletion, "send_subscription_deletion_request")
+ def test_terminate_vnf_when_no_vnf_uuid(self, mock_send_subscription_deletion_request, mock_call_req, mock_sleep):
+ nf_inst_id = "test_terminate_vnf_when_no_vnf_uuid"
+ NSInstModel(id=nf_inst_id, name="ns_name_2").save()
+ NfInstModel.objects.create(nfinstid=nf_inst_id,
+ vnfm_inst_id="2",
+ status="active",
+ vnfd_model=self.vnfd_model
+ )
+ VmInstModel.objects.create(vmid="2",
+ vimid='{"cloud_owner": "VCPE", "cloud_regionid": "RegionOne"}',
+ instid=nf_inst_id
+ )
+ job_id = JobUtil.create_job(JOB_TYPE.VNF, JOB_ACTION.TERMINATE, nf_inst_id)
+ job_info = {
+ "jobId": job_id,
+ "responsedescriptor": {"status": JOB_MODEL_STATUS.FINISHED}
+ }
+ mock_vals = {
+ "/external-system/esr-vnfm-list/esr-vnfm/2?depth=all":
+ [0, json.JSONEncoder().encode(vnfm_info), "200"],
+ "/api/ztevnfmdriver/v1/2/vnfs/None/terminate":
+ [0, json.JSONEncoder().encode({"jobId": job_id}), "200"],
+ "/api/ztevnfmdriver/v1/2/jobs/" + job_id + "?responseId=0":
+ [0, json.JSONEncoder().encode(job_info), "200"],
+ "/api/resmgr/v1/vnf/%s" % nf_inst_id:
+ [0, json.JSONEncoder().encode({"jobId": job_id}), "200"]
+ }
+
+ def side_effect(*args):
+ return mock_vals[args[4]]
+
+ mock_call_req.side_effect = side_effect
+ TerminateVnfs(self.data, nf_inst_id, job_id).run()
+ nfinst = NfInstModel.objects.filter(nfinstid=nf_inst_id)
+ if nfinst:
+ self.assertEqual(1, 0)
+ else:
+ self.assertEqual(1, 1)
+ self.assertEqual(JobModel.objects.get(jobid=job_id).progress, 100)
+
+ @mock.patch.object(time, "sleep")
+ @mock.patch.object(restcall, "call_req")
+ @mock.patch.object(SubscriptionDeletion, "send_subscription_deletion_request")
+ def test_terminate_vnf_when_nf_not_exists(self, mock_send_subscription_deletion_request, mock_call_req, mock_sleep):
+ job_id = JobUtil.create_job(JOB_TYPE.VNF, JOB_ACTION.TERMINATE, self.nf_inst_id)
+ TerminateVnfs(self.data, "nf_not_exists", job_id).run()
+ nfinst = NfInstModel.objects.filter(nfinstid="nf_not_exists")
+ if nfinst:
+ self.assertEqual(1, 0)
+ else:
+ self.assertEqual(1, 1)
+ self.assertEqual(JobModel.objects.get(jobid=job_id).progress, 255)
+
def test_terminate_vnf_when_vnf_is_dealing(self):
NfInstModel.objects.filter(nfinstid=self.nf_inst_id).update(status=VNF_STATUS.TERMINATING)
job_id = JobUtil.create_job(JOB_TYPE.VNF, JOB_ACTION.TERMINATE, self.nf_inst_id)
self.vnf_inst_id = "1234"
self.vnf_id = "vG"
self.client = Client()
+ self.url = "/api/nslcm/v1/ns/placevnf"
+ self.data = vnf_place_request
OOFDataModel.objects.all().delete()
OOFDataModel.objects.create(
request_id="1234",
"flavorId": "12345",
"directive": []
}]
+ # response = self.client.post(self.url, data=self.data)
PlaceVnfs(vnf_place_request).extract()
db_info = OOFDataModel.objects.filter(request_id=vnf_place_request.get("requestId"), transaction_id=vnf_place_request.get("transactionId"))
self.assertEqual(db_info[0].request_status, "completed")
"statusdescription": "creating",
"errorcode": "0"}]}}), "200"],
"api/gvnfmdriver/v1/1/subscriptions":
- [0, json.JSONEncoder().encode({}), "200"],
+ [0, json.JSONEncoder().encode(subscription_response_data), "200"],
"/api/resmgr/v1/vnfinfo":
[0, json.JSONEncoder().encode(subscription_response_data), "200"],
ret = OOFDataModel.objects.filter(request_id="1234", transaction_id="1234")
self.assertIsNotNone(ret)
+ @mock.patch.object(time, "sleep")
+ @mock.patch.object(restcall, "call_req")
+ def test_create_vnf_thread_sucess_when_failed_to_subscribe_from_vnfm(self, mock_call_req, mock_sleep):
+ mock_sleep.return_value = None
+ nf_inst_id, job_id = create_vnfs.prepare_create_params()
+ mock_vals = {
+ "/api/catalog/v1/vnfpackages/zte_vbras":
+ [0, json.JSONEncoder().encode(nf_package_info), "200"],
+ "/external-system/esr-vnfm-list/esr-vnfm/1?depth=all":
+ [0, json.JSONEncoder().encode(vnfm_info), "200"],
+ "/api/ztevnfmdriver/v1/1/vnfs":
+ [0, json.JSONEncoder().encode({"jobId": self.job_id, "vnfInstanceId": 3}), "200"],
+ "/api/oof/v1/placement":
+ [0, json.JSONEncoder().encode({}), "202"],
+ "/api/resmgr/v1/vnf":
+ [0, json.JSONEncoder().encode({}), "200"],
+ "/api/ztevnfmdriver/v1/1/jobs/" + self.job_id + "?responseId=0":
+ [0, json.JSONEncoder().encode({"jobid": self.job_id,
+ "responsedescriptor": {"progress": "100",
+ "status": JOB_MODEL_STATUS.FINISHED,
+ "responseid": "3",
+ "statusdescription": "creating",
+ "errorcode": "0",
+ "responsehistorylist": [
+ {"progress": "0",
+ "status": JOB_MODEL_STATUS.PROCESSING,
+ "responseid": "2",
+ "statusdescription": "creating",
+ "errorcode": "0"}]}}), "200"],
+ "api/gvnfmdriver/v1/1/subscriptions":
+ [1, json.JSONEncoder().encode(subscription_response_data), "200"],
+ "/api/resmgr/v1/vnfinfo":
+ [0, json.JSONEncoder().encode(subscription_response_data), "200"],
+ }
+
+ def side_effect(*args):
+ return mock_vals[args[4]]
+
+ mock_call_req.side_effect = side_effect
+ data = {
+ "ns_instance_id": ignore_case_get(self.data, "nsInstanceId"),
+ "additional_param_for_ns": ignore_case_get(self.data, "additionalParamForNs"),
+ "additional_param_for_vnf": ignore_case_get(self.data, "additionalParamForVnf"),
+ "vnf_index": ignore_case_get(self.data, "vnfIndex")
+ }
+ CreateVnfs(data, nf_inst_id, job_id).run()
+ self.assertEqual(NfInstModel.objects.get(nfinstid=nf_inst_id).status, VNF_STATUS.ACTIVE)
+ self.assertEqual(JobModel.objects.get(jobid=job_id).progress, JOB_PROGRESS.FINISHED)
+
class TestUpdateVnfsViews(TestCase):
def setUp(self):
CPInstModel.objects.all().delete()
def test_handle_vnf_lcm_ooc_notification_when_change_type_is_added(self):
+ # response = self.client.post(self.url, data=self.data)
HandleVnfLcmOocNotification(self.vnfm_inst_id, self.m_nf_inst_id, self.data).do_biz()
vnfc_inst = VNFCInstModel.objects.get(vnfcinstanceid="vnfc_instance_id", vduid="vdu_id",
nfinstid=self.nf_inst_id, vmid="resource_id")