5 from django.test import TestCase
6 from rest_framework.test import APIClient
8 from lcm.jobs.enum import JOB_TYPE, JOB_ACTION, JOB_PROGRESS
9 from lcm.ns.biz.ns_terminate import TerminateNsService
10 from lcm.ns.enum import OWNER_TYPE
11 from lcm.ns.tests import TERMINATE_NS_DICT
12 from lcm.pub.database.models import NSInstModel, JobModel, FPInstModel, NfInstModel, VLInstModel, PNFInstModel
13 from lcm.pub.utils.jobutil import JobUtil
16 class TestScaleAspect(TestCase):
18 self.client = APIClient()
19 self.ns_inst_id = 'test_ns_terminate_001'
20 self.url = '/api/nslcm/v1/ns/%s/terminate' % self.ns_inst_id
21 self.job_id = JobUtil.create_job(JOB_TYPE.NS, JOB_ACTION.TERMINATE, self.ns_inst_id)
22 NSInstModel(id=self.ns_inst_id).save()
23 NSInstModel(id='test_ns_terminate_002').save()
24 FPInstModel(nsinstid=self.ns_inst_id, sfcid='test_sfc_inst_001', fpname='xxx', status='zzz').save()
25 NfInstModel(ns_inst_id=self.ns_inst_id).save()
26 VLInstModel(ownertype=OWNER_TYPE.NS, ownerid=self.ns_inst_id).save()
27 PNFInstModel(nsInstances=self.ns_inst_id).save()
30 NSInstModel.objects.all().delete()
31 FPInstModel.objects.all().delete()
32 NfInstModel.objects.all().delete()
33 VLInstModel.objects.all().delete()
34 PNFInstModel.objects.all().delete()
35 JobModel.objects.all().delete()
37 def test_ns_terminate_asynchronous_tasks_when_ns_does_not_exist(self):
38 ns_instance_id = 'test_ns_terminate_not_exist'
39 job_id = JobUtil.create_job(JOB_TYPE.NS, JOB_ACTION.TERMINATE, ns_instance_id)
40 TerminateNsService(ns_instance_id, job_id, TERMINATE_NS_DICT).run()
41 self.assertEqual(JobModel.objects.filter(jobid=job_id).first().progress, JOB_PROGRESS.FINISHED)
43 def test_ns_terminate_asynchronous_tasks_with_none(self):
44 ns_instance_id = 'test_ns_terminate_002'
45 job_id = JobUtil.create_job(JOB_TYPE.NS, JOB_ACTION.TERMINATE, ns_instance_id)
46 TerminateNsService(ns_instance_id, job_id, TERMINATE_NS_DICT).run()
47 self.assertEqual(JobModel.objects.filter(jobid=job_id).first().progress, JOB_PROGRESS.FINISHED)
49 @mock.patch.object(time, 'sleep')
50 @mock.patch.object(httplib2.Http, 'request')
51 def test_ns_terminate_asynchronous_tasks(self, mock_request, mock_sleep):
52 mock_request.side_effect = [
53 ({'status': '200'}, '{"result": "0"}'.encode('utf-8')),
54 ({'status': '200'}, '{"jobId": "zzz"}'.encode('utf-8')),
55 ({'status': '200'}, '{"responseDescriptor": {"progress": 100, "responseId": 1, '
56 '"statusDescription": ""}}'.encode('utf-8')),
57 ({'status': '200'}, '{"result": "0"}'.encode('utf-8')),
58 ({'status': '200'}, '{"result": "0"}'.encode('utf-8'))
60 mock_sleep.return_value = None
62 TerminateNsService(self.ns_inst_id, self.job_id, TERMINATE_NS_DICT).run()
63 self.assertEqual(JobModel.objects.filter(jobid=self.job_id).first().progress, JOB_PROGRESS.FINISHED)
65 @mock.patch.object(time, 'sleep')
66 @mock.patch.object(httplib2.Http, 'request')
67 def test_ns_terminate_asynchronous_tasks_when_cancel_vnf_no_vnfjobid(self, mock_request, mock_sleep):
68 ns_instance_id = 'test_ns_terminate_when_cancel_vnf_no_vnfjobid'
69 NSInstModel(id=ns_instance_id).save()
70 NfInstModel(ns_inst_id=ns_instance_id).save()
71 mock_request.side_effect = [
72 ({'status': '200'}, '{"jobId": ""}'.encode('utf-8'))
74 mock_sleep.return_value = None
76 job_id = JobUtil.create_job(JOB_TYPE.NS, JOB_ACTION.TERMINATE, ns_instance_id)
77 TerminateNsService(ns_instance_id, job_id, TERMINATE_NS_DICT).run()
78 self.assertEqual(JobModel.objects.filter(jobid=job_id).first().progress, JOB_PROGRESS.FINISHED)
80 @mock.patch.object(time, 'sleep')
81 @mock.patch.object(httplib2.Http, 'request')
82 def test_ns_terminate_asynchronous_tasks_when_terminate_vnf_failed(self, mock_request, mock_sleep):
83 ns_instance_id = 'test_ns_terminate_when_terminate_vnf_failed'
84 NSInstModel(id=ns_instance_id).save()
85 NfInstModel(ns_inst_id=ns_instance_id).save()
86 mock_request.side_effect = [
87 ({'status': '404'}, '{"jobId": "zzz"}'.encode('utf-8'))
89 mock_sleep.return_value = None
91 job_id = JobUtil.create_job(JOB_TYPE.NS, JOB_ACTION.TERMINATE, ns_instance_id)
92 TerminateNsService(ns_instance_id, job_id, TERMINATE_NS_DICT).run()
93 self.assertEqual(JobModel.objects.filter(jobid=job_id).first().progress, JOB_PROGRESS.FINISHED)
95 @mock.patch.object(time, 'sleep')
96 @mock.patch.object(httplib2.Http, 'request')
97 def test_ns_terminate_asynchronous_tasks_when_failed_to_query_job(self, mock_request, mock_sleep):
98 ns_instance_id = 'test_ns_terminate_when_failed_to_query_job'
99 NSInstModel(id=ns_instance_id).save()
100 NfInstModel(ns_inst_id=ns_instance_id).save()
101 mock_request.side_effect = [
102 ({'status': '200'}, '{"jobId": "zzz"}'.encode('utf-8')),
104 ({'status': '200'}, '{"responseDescriptor": {"progress": 100, "responseId": 1, '
105 '"statusDescription": ""}}'.encode('utf-8'))
107 mock_sleep.return_value = None
109 job_id = JobUtil.create_job(JOB_TYPE.NS, JOB_ACTION.TERMINATE, ns_instance_id)
110 TerminateNsService(ns_instance_id, job_id, TERMINATE_NS_DICT).run()
111 self.assertEqual(JobModel.objects.filter(jobid=job_id).first().progress, JOB_PROGRESS.FINISHED)
113 @mock.patch.object(time, 'sleep')
114 @mock.patch.object(httplib2.Http, 'request')
115 def test_ns_terminate_asynchronous_tasks_when_no_new_progress(self, mock_request, mock_sleep):
116 ns_instance_id = 'test_ns_terminate_when_no_new_progress'
117 NSInstModel(id=ns_instance_id).save()
118 NfInstModel(ns_inst_id=ns_instance_id).save()
119 mock_request.side_effect = [
120 ({'status': '200'}, '{"jobId": "zzz"}'.encode('utf-8')),
121 ({'status': '200'}, '{}'.encode('utf-8')),
122 ({'status': '200'}, '{"responseDescriptor": {"progress": 100, "responseId": 1, '
123 '"statusDescription": ""}}'.encode('utf-8'))
125 mock_sleep.return_value = None
127 job_id = JobUtil.create_job(JOB_TYPE.NS, JOB_ACTION.TERMINATE, ns_instance_id)
128 TerminateNsService(ns_instance_id, job_id, TERMINATE_NS_DICT).run()
129 self.assertEqual(JobModel.objects.filter(jobid=job_id).first().progress, JOB_PROGRESS.FINISHED)
131 @mock.patch.object(time, 'sleep')
132 @mock.patch.object(httplib2.Http, 'request')
133 def test_ns_terminate_asynchronous_tasks_when_job_failed(self, mock_request, mock_sleep):
134 ns_instance_id = 'test_ns_terminate_when_job_failed'
135 NSInstModel(id=ns_instance_id).save()
136 NfInstModel(ns_inst_id=ns_instance_id).save()
137 mock_request.side_effect = [
138 ({'status': '200'}, '{"jobId": "zzz"}'.encode('utf-8')),
139 ({'status': '200'}, '{"responseDescriptor": {"progress": 255, "responseId": 1, '
140 '"statusDescription": ""}}'.encode('utf-8'))
142 mock_sleep.return_value = None
144 job_id = JobUtil.create_job(JOB_TYPE.NS, JOB_ACTION.TERMINATE, ns_instance_id)
145 TerminateNsService(ns_instance_id, job_id, TERMINATE_NS_DICT).run()
146 self.assertEqual(JobModel.objects.filter(jobid=job_id).first().progress, JOB_PROGRESS.FINISHED)