1 # Copyright 2016 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from mock import MagicMock
18 from django.test import TestCase
19 from rest_framework import status
20 from rest_framework.test import APIClient
21 from lcm.ns.biz.ns_instant import BuildInWorkflowThread
22 from lcm.ns.biz.ns_instant import InstantNSService
23 from lcm.pub.database.models import NSInstModel, WFPlanModel
24 from lcm.pub.utils import restcall
25 from lcm.pub.config import config
26 from lcm.ns.tests import NSD_MODEL_DICT, NSD_MODEL_WITH_PNF_DICT, VNFM_LIST_IN_AAI_DICT, VNFM_IN_AAI_DICT, JOB_DICT, INSTANTIATE_NS_DICT, INSTANTIATE_NS_WITH_PNF_DICT
29 class TestNsInstant(TestCase):
31 vnfminfo = {"vnfmId": "1"}
34 self.client = APIClient()
35 NSInstModel.objects.filter().delete()
36 self.url = "/api/nslcm/v1/ns/%s/instantiate" % "2"
37 NSInstModel(id="2", nspackage_id="7", nsd_id="2", status="active").save()
38 self._mock_get_auto_id()
41 NSInstModel.objects.all().delete()
42 WFPlanModel.objects.all().delete()
44 def _mock_get_auto_id(self):
45 fake_auto_id = mock.Mock()
46 fake_auto_id.return_value = 1
48 'lcm.pub.utils.idutil.get_auto_id',
52 @mock.patch.object(restcall, 'call_req')
53 @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps({"model": json.dumps(NSD_MODEL_DICT)})))
54 @mock.patch.object(BuildInWorkflowThread, 'run')
55 def test_ns_instantiate_when_succeed_to_enter_workflow(self, mock_run, mock_call_req):
56 config.WORKFLOW_OPTION = "buildin"
57 mock_call_req.side_effect = [
58 [0, {"model": json.dumps(NSD_MODEL_DICT)}, '200'],
60 [0, VNFM_LIST_IN_AAI_DICT, '200'],
61 [0, VNFM_IN_AAI_DICT, '200']
63 resp = self.client.post(self.url, data=INSTANTIATE_NS_DICT, format='json')
64 self.assertEqual(status.HTTP_200_OK, resp.status_code)
65 self.assertIn("jobId", resp.data)
67 @mock.patch.object(InstantNSService, 'do_biz')
68 def test_ns_instantiate_normal(self, mock_do_biz):
69 mock_do_biz.return_value = dict(data=JOB_DICT, status=status.HTTP_200_OK)
70 resp = self.client.post(self.url, data=INSTANTIATE_NS_DICT, format='json')
71 self.assertEqual(status.HTTP_200_OK, resp.status_code)
72 self.assertEqual(JOB_DICT, resp.data)
74 @mock.patch.object(restcall, 'call_req')
75 def test_ns_instantiate_when_fail_to_parse_nsd(self, mock_call_req):
76 mock_call_req.return_value = [1, "Failed to parse nsd", '500']
77 resp = self.client.post(self.url, data=INSTANTIATE_NS_DICT, format='json')
78 self.assertEqual(resp.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
79 self.assertIn("error", resp.data)
81 @mock.patch('lcm.ns.biz.ns_instantiate_flow.post_deal')
82 @mock.patch.object(restcall, 'call_req')
83 @mock.patch('lcm.ns.biz.ns_instantiate_flow.update_job')
84 @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps(NSD_MODEL_WITH_PNF_DICT)))
85 @mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))
86 def test_ns_instantiate_with_pnf(self, mock_updata_job, mock_call_req, mock_post_deal):
87 config.WORKFLOW_OPTION = "grapflow"
88 NSInstModel(id="1", name="test_ns", nspackage_id="1", status="created").save()
89 ret = [0, json.JSONEncoder().encode(JOB_DICT), '200']
90 mock_call_req.side_effect = [ret for i in range(1, 20)]
91 ack = InstantNSService(1, INSTANTIATE_NS_WITH_PNF_DICT).do_biz()
92 self.assertEqual(ack['status'], status.HTTP_200_OK)
94 @mock.patch('lcm.ns.biz.ns_instantiate_flow.post_deal')
95 @mock.patch.object(restcall, 'call_req')
96 @mock.patch('lcm.ns.biz.ns_instantiate_flow.update_job')
97 @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps(NSD_MODEL_WITH_PNF_DICT)))
98 @mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))
99 def test_ns_instantiate_with_pnf_when_wso2(self, mock_updata_job, mock_call_req, mock_post_deal):
100 config.WORKFLOW_OPTION = "wso2"
101 NSInstModel(id="1", name="test_ns", nspackage_id="1", status="created", nsd_id="nsd_id").save()
102 stpls = [{"id": "nsd_id", "csarId": "csar_id", "templateName": "template_name",
103 "serviceTemplateId": "service_template_id"}]
104 items = [{"name": ["init"], "processId": "process_id"}]
107 "/api/catalog/v1/servicetemplates":
108 [0, json.JSONEncoder().encode(stpls), "200"],
109 "/api/catalog/v1/servicetemplates/service_template_id/operations":
110 [0, json.JSONEncoder().encode(items), "200"],
111 "/api/wso2bpel/v1/process/instance":
112 [0, json.JSONEncoder().encode(ret), "200"],
113 "/api/catalog/v1/ns_descriptors/1":
117 def side_effect(*args):
118 return mock_vals[args[4]]
120 mock_call_req.side_effect = side_effect
121 ack = InstantNSService(1, INSTANTIATE_NS_WITH_PNF_DICT).do_biz()
122 self.assertEqual(ack['status'], status.HTTP_200_OK)
124 @mock.patch('lcm.ns.biz.ns_instantiate_flow.post_deal')
125 @mock.patch.object(restcall, 'call_req')
126 @mock.patch('lcm.ns.biz.ns_instantiate_flow.update_job')
127 @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps(NSD_MODEL_WITH_PNF_DICT)))
128 @mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))
129 def test_ns_instantiate_with_pnf_when_activiti(self, mock_updata_job, mock_call_req, mock_post_deal):
130 config.WORKFLOW_OPTION = "activiti"
131 NSInstModel(id="1", name="test_ns", nspackage_id="1", status="created", nsd_id="nsd_id").save()
132 WFPlanModel(deployed_id="1", process_id="process_id", message="message", status=1, plan_name="plan_name").save()
135 "api/workflow/v1/process/instance":
136 [0, json.JSONEncoder().encode(ret), "200"],
137 "/api/catalog/v1/ns_descriptors/1":
141 def side_effect(*args):
142 return mock_vals[args[4]]
144 mock_call_req.side_effect = side_effect
145 ack = InstantNSService(1, INSTANTIATE_NS_WITH_PNF_DICT).do_biz()
146 self.assertEqual(ack['status'], status.HTTP_200_OK)
148 @mock.patch.object(restcall, 'call_req')
149 @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps(NSD_MODEL_WITH_PNF_DICT)))
150 @mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))
151 def test_ns_instantiate_with_vimid_1(self, mock_call_req):
152 config.WORKFLOW_OPTION = "grapflow"
153 NSInstModel(id="1", name="test_ns", nspackage_id="1", status="created").save()
154 ret = [0, json.JSONEncoder().encode(JOB_DICT), '200']
155 mock_call_req.side_effect = [ret for i in range(1, 20)]
156 ack = InstantNSService(1, INSTANTIATE_NS_WITH_PNF_DICT).do_biz()
157 self.assertEqual(ack['status'], status.HTTP_200_OK)
159 @mock.patch.object(restcall, 'call_req')
160 @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=json.dumps(NSD_MODEL_WITH_PNF_DICT)))
161 @mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))
162 def test_ns_instantiate_with_different_vimid_2(self, mock_call_req):
163 config.WORKFLOW_OPTION = "grapflow"
164 NSInstModel(id="1", name="test_ns", nspackage_id="1", status="created").save()
165 ret = [0, json.JSONEncoder().encode(JOB_DICT), '200']
166 mock_call_req.side_effect = [ret for i in range(1, 20)]
167 ack = InstantNSService(1, INSTANTIATE_NS_WITH_PNF_DICT).do_biz()
168 self.assertEqual(ack['status'], status.HTTP_200_OK)