from rest_framework.test import APIClient
from lcm.ns.biz.ns_instant import BuildInWorkflowThread
from lcm.ns.biz.ns_instant import InstantNSService
-from lcm.pub.database.models import NSInstModel
+from lcm.pub.database.models import NSInstModel, WFPlanModel
from lcm.pub.utils import restcall
from lcm.pub.config import config
from lcm.ns.tests import NSD_MODEL_DICT, NSD_MODEL_WITH_PNF_DICT, VNFM_LIST_IN_AAI_DICT, VNFM_IN_AAI_DICT, JOB_DICT, INSTANTIATE_NS_DICT, INSTANTIATE_NS_WITH_PNF_DICT
self._mock_get_auto_id()
def tearDown(self):
- pass
+ NSInstModel.objects.all().delete()
+ WFPlanModel.objects.all().delete()
def _mock_get_auto_id(self):
fake_auto_id = mock.Mock()
ack = InstantNSService(1, INSTANTIATE_NS_WITH_PNF_DICT).do_biz()
self.assertEqual(ack['status'], status.HTTP_200_OK)
+ @mock.patch('lcm.ns.biz.ns_instantiate_flow.post_deal')
+ @mock.patch.object(restcall, 'call_req')
+ @mock.patch('lcm.ns.biz.ns_instantiate_flow.update_job')
+ @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps(NSD_MODEL_WITH_PNF_DICT)))
+ @mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))
+ def test_ns_instantiate_with_pnf_when_wso2(self, mock_updata_job, mock_call_req, mock_post_deal):
+ config.WORKFLOW_OPTION = "wso2"
+ NSInstModel(id="1", name="test_ns", nspackage_id="1", status="created", nsd_id="nsd_id").save()
+ stpls = [{"id": "nsd_id", "csarId": "csar_id", "templateName": "template_name",
+ "serviceTemplateId": "service_template_id"}]
+ items = [{"name": ["init"], "processId": "process_id"}]
+ ret = {"status": 1}
+ mock_vals = {
+ "/api/catalog/v1/servicetemplates":
+ [0, json.JSONEncoder().encode(stpls), "200"],
+ "/api/catalog/v1/servicetemplates/service_template_id/operations":
+ [0, json.JSONEncoder().encode(items), "200"],
+ "/api/wso2bpel/v1/process/instance":
+ [0, json.JSONEncoder().encode(ret), "200"],
+ }
+
+ def side_effect(*args):
+ return mock_vals[args[4]]
+
+ mock_call_req.side_effect = side_effect
+ ack = InstantNSService(1, INSTANTIATE_NS_WITH_PNF_DICT).do_biz()
+ self.assertEqual(ack['status'], status.HTTP_200_OK)
+
+ @mock.patch('lcm.ns.biz.ns_instantiate_flow.post_deal')
+ @mock.patch.object(restcall, 'call_req')
+ @mock.patch('lcm.ns.biz.ns_instantiate_flow.update_job')
+ @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps(NSD_MODEL_WITH_PNF_DICT)))
+ @mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))
+ def test_ns_instantiate_with_pnf_when_activiti(self, mock_updata_job, mock_call_req, mock_post_deal):
+ config.WORKFLOW_OPTION = "activiti"
+ NSInstModel(id="1", name="test_ns", nspackage_id="1", status="created", nsd_id="nsd_id").save()
+ WFPlanModel(deployed_id="1", process_id="process_id", message="message", status=1, plan_name="plan_name").save()
+ ret = {"status": 1}
+ mock_vals = {
+ "api/workflow/v1/process/instance":
+ [0, json.JSONEncoder().encode(ret), "200"],
+ }
+
+ def side_effect(*args):
+ return mock_vals[args[4]]
+
+ mock_call_req.side_effect = side_effect
+ ack = InstantNSService(1, INSTANTIATE_NS_WITH_PNF_DICT).do_biz()
+ self.assertEqual(ack['status'], status.HTTP_200_OK)
+
@mock.patch.object(restcall, 'call_req')
@mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps(NSD_MODEL_WITH_PNF_DICT)))
@mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))