a64a7f3d2aabc442f6b88903c95f9e4ec734ab6c
[vfc/gvnfm/vnflcm.git] / lcm / lcm / nf / tests / test_terminate_vnf.py
1 # Copyright 2017 ZTE Corporation.\r
2 #\r
3 # Licensed under the Apache License, Version 2.0 (the "License");\r
4 # you may not use this file except in compliance with the License.\r
5 # You may obtain a copy of the License at\r
6 #\r
7 #         http://www.apache.org/licenses/LICENSE-2.0\r
8 #\r
9 # Unless required by applicable law or agreed to in writing, software\r
10 # distributed under the License is distributed on an "AS IS" BASIS,\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
12 # See the License for the specific language governing permissions and\r
13 # limitations under the License.\r
14 \r
15 \r
16 import json\r
17 import uuid\r
18 \r
19 import mock\r
20 from django.test import TestCase, Client\r
21 from rest_framework import status\r
22 \r
23 from lcm.nf.biz.terminate_vnf import TerminateVnf\r
24 from lcm.pub.database.models import NfInstModel, JobStatusModel, VmInstModel, NetworkInstModel, SubNetworkInstModel, \\r
25     PortInstModel, FlavourInstModel, StorageInstModel, SubscriptionModel\r
26 from lcm.pub.utils import restcall\r
27 from lcm.pub.utils.jobutil import JobUtil\r
28 from lcm.pub.utils.timeutil import now_time\r
29 from lcm.pub.utils.notificationsutil import NotificationsUtil\r
30 from lcm.pub.vimapi import api\r
31 \r
32 \r
33 class TestNFTerminate(TestCase):\r
34     def setUp(self):\r
35         self.client = Client()\r
36         StorageInstModel.objects.create(storageid="1",\r
37                                         vimid="1",\r
38                                         resourceid="11",\r
39                                         insttype=0,\r
40                                         instid="1111",\r
41                                         is_predefined=1)\r
42         NetworkInstModel.objects.create(networkid='1',\r
43                                         vimid='1',\r
44                                         resourceid='1',\r
45                                         name='pnet_network',\r
46                                         is_predefined=1,\r
47                                         tenant='admin',\r
48                                         insttype=0,\r
49                                         instid='1111')\r
50         SubNetworkInstModel.objects.create(subnetworkid='1',\r
51                                            vimid='1',\r
52                                            resourceid='1',\r
53                                            networkid='1',\r
54                                            is_predefined=1,\r
55                                            name='sub_pnet',\r
56                                            tenant='admin',\r
57                                            insttype=0,\r
58                                            instid='1111')\r
59         PortInstModel.objects.create(portid='1',\r
60                                      networkid='1',\r
61                                      subnetworkid='1',\r
62                                      vimid='1',\r
63                                      resourceid='1',\r
64                                      is_predefined=1,\r
65                                      name='aaa_pnet_cp',\r
66                                      tenant='admin',\r
67                                      insttype=0,\r
68                                      instid='1111')\r
69         FlavourInstModel.objects.create(flavourid="1",\r
70                                         vimid="1",\r
71                                         resourceid="11",\r
72                                         instid="1111",\r
73                                         is_predefined=1)\r
74         VmInstModel.objects.create(vmid="1",\r
75                                    vimid="1",\r
76                                    resourceid="11",\r
77                                    insttype=0,\r
78                                    instid="1111",\r
79                                    vmname="test_01",\r
80                                    is_predefined=1,\r
81                                    operationalstate=1)\r
82 \r
83     def tearDown(self):\r
84         VmInstModel.objects.all().delete()\r
85         NetworkInstModel.objects.all().delete()\r
86         SubNetworkInstModel.objects.all().delete()\r
87         PortInstModel.objects.all().delete()\r
88         NfInstModel.objects.all().delete()\r
89 \r
90     def assert_job_result(self, job_id, job_progress, job_detail):\r
91         jobs = JobStatusModel.objects.filter(jobid=job_id,\r
92                                              progress=job_progress,\r
93                                              descp=job_detail)\r
94         self.assertEqual(1, len(jobs))\r
95 \r
96     @mock.patch.object(TerminateVnf, 'run')\r
97     def test_terminate_vnf(self, mock_run):\r
98         req_data = {\r
99             "terminationType": "GRACEFUL",\r
100             "gracefulTerminationTimeout": 120\r
101         }\r
102         NfInstModel(nfinstid='12',\r
103                     nf_name='VNF1',\r
104                     nf_desc="VNF DESC",\r
105                     vnfdid="1",\r
106                     netype="XGW",\r
107                     vendor="ZTE",\r
108                     vnfSoftwareVersion="V1",\r
109                     version="V1",\r
110                     package_id="2",\r
111                     status='INSTANTIATED').save()\r
112         mock_run.re.return_value = None\r
113         response = self.client.post("/api/vnflcm/v1/vnf_instances/12/terminate", data=req_data, format='json')\r
114         self.failUnlessEqual(status.HTTP_202_ACCEPTED, response.status_code, response.content)\r
115 \r
116     @mock.patch.object(TerminateVnf, 'run')\r
117     def test_terminate_vnf_not_found(self, mock_run):\r
118         req_data = {\r
119             "terminationType": "GRACEFUL",\r
120             "gracefulTerminationTimeout": 120\r
121         }\r
122         mock_run.re.return_value = None\r
123         response = self.client.post("/api/vnflcm/v1/vnf_instances/567/terminate", data=req_data, format='json')\r
124         self.failUnlessEqual(status.HTTP_404_NOT_FOUND, response.status_code, response.content)\r
125 \r
126     @mock.patch.object(TerminateVnf, 'run')\r
127     def test_terminate_vnf_conflict(self, mock_run):\r
128         req_data = {\r
129             "terminationType": "GRACEFUL",\r
130             "gracefulTerminationTimeout": 120\r
131         }\r
132         NfInstModel(nfinstid='123',\r
133                     nf_name='VNF1',\r
134                     nf_desc="VNF DESC",\r
135                     vnfdid="1",\r
136                     netype="XGW",\r
137                     vendor="ZTE",\r
138                     vnfSoftwareVersion="V1",\r
139                     version="V1",\r
140                     package_id="2",\r
141                     status='NOT_INSTANTIATED').save()\r
142         mock_run.re.return_value = None\r
143         response = self.client.post("/api/vnflcm/v1/vnf_instances/123/terminate", data=req_data, format='json')\r
144         self.failUnlessEqual(status.HTTP_409_CONFLICT, response.status_code, response.content)\r
145 \r
146     def test_terminate_vnf_when_inst_id_not_exist(self):\r
147         data = {\r
148             "terminationType": "GRACEFUL",\r
149             "gracefulTerminationTimeout": 120\r
150         }\r
151         self.nf_inst_id = str(uuid.uuid4())\r
152         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
153         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
154         TerminateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
155         self.assert_job_result(self.job_id, 100, "Terminate Vnf success.")\r
156 \r
157     @mock.patch.object(restcall, 'call_req')\r
158     @mock.patch.object(api, 'call')\r
159     @mock.patch.object(NotificationsUtil, 'post_notification')\r
160     def test_terminate_vnf_success(self, mock_post_notification, mock_call, mock_call_req):\r
161         NfInstModel.objects.create(nfinstid='1111',\r
162                                    nf_name='2222',\r
163                                    vnfminstid='1',\r
164                                    package_id='todo',\r
165                                    version='',\r
166                                    vendor='',\r
167                                    netype='',\r
168                                    vnfd_model='',\r
169                                    status='VNF_INSTANTIATED',\r
170                                    nf_desc='',\r
171                                    vnfdid='',\r
172                                    vnfSoftwareVersion='',\r
173                                    vnfConfigurableProperties='todo',\r
174                                    localizationLanguage='EN_US',\r
175                                    create_time=now_time())\r
176 \r
177         SubscriptionModel.objects.create(\r
178             subscription_id=str(uuid.uuid4()),\r
179             callback_uri='api/gvnfmdriver/v1/vnfs/lifecyclechangesnotification',\r
180             auth_info=json.JSONEncoder().encode({\r
181                 'authType': ['BASIC'],\r
182                 'paramsBasic': {\r
183                     'userName': 'username',\r
184                     'password': 'password'\r
185                 }\r
186             }),\r
187             notification_types=str([\r
188                 'VnfLcmOperationOccurrenceNotification',\r
189                 'VnfIdentifierCreationNotification',\r
190                 'VnfIdentifierDeletionNotification'\r
191             ]),\r
192             operation_types=str(['TERMINATE']),\r
193             operation_states=str(['COMPLETED']),\r
194             vnf_instance_filter=json.JSONEncoder().encode({\r
195                 'vnfdIds': ['111'],\r
196                 'vnfProductsFromProviders': [],\r
197                 'vnfInstanceIds': ['1111'],\r
198                 'vnfInstanceNames': [],\r
199             })\r
200         )\r
201 \r
202         t1_apply_grant_result = [0, json.JSONEncoder().encode(\r
203             {\r
204                 "id": "1",\r
205                 "vnfInstanceId": "1",\r
206                 "vnfLcmOpOccId": "2",\r
207                 "vimConnections": [\r
208                     {\r
209                         "id": "1",\r
210                         "vimId": "1"\r
211                     }\r
212                 ]\r
213             }), '200']\r
214         t2_lcm_notify_result = [0, json.JSONEncoder().encode(''), '200']\r
215         t3_delete_flavor = [0, json.JSONEncoder().encode({"vim_id": "vimid_1"}), '200']\r
216         mock_call_req.side_effect = [t1_apply_grant_result, t2_lcm_notify_result, t3_delete_flavor]\r
217         mock_call.return_value = None\r
218         mock_post_notification.return_value = None\r
219         data = {\r
220             "terminationType": "FORCEFUL",\r
221             "gracefulTerminationTimeout": 120\r
222         }\r
223         self.nf_inst_id = '1111'\r
224         self.job_id = JobUtil.create_job('NF', 'CREATE', self.nf_inst_id)\r
225         JobUtil.add_job_status(self.job_id, 0, "INST_VNF_READY")\r
226         TerminateVnf(data, nf_inst_id=self.nf_inst_id, job_id=self.job_id).run()\r
227         self.assert_job_result(self.job_id, 100, "Terminate Vnf success.")\r