ADD UT for ns_lcm Issue-ID: VFC-1429 Signed-off-by: zhuerlei <zhu.erlei@zte.com.cn>
[vfc/nfvo/lcm.git] / lcm / jobs / tests / tests.py
1 # Copyright 2016 ZTE Corporation.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #         http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from django.test import TestCase
16 from lcm.jobs.const import JOB_INSTANCE_URI
17 from lcm.jobs.enum import JOB_ACTION, JOB_STATUS, JOB_TYPE
18 from lcm.jobs.tests import UPDATE_JOB_DICT, UPDATE_JOB_BAD_REQ_DICT
19 from lcm.pub.database.models import JobModel, JobStatusModel
20 from rest_framework import status
21 from rest_framework.test import APIClient
22
23
24 class JobsViewTest(TestCase):
25     def setUp(self):
26         self.job_id = 'test_job_id'
27         self.client = APIClient()
28         JobModel.objects.all().delete()
29         JobStatusModel.objects.all().delete()
30
31     def tearDown(self):
32         JobModel.objects.all().delete()
33         JobStatusModel.objects.all().delete()
34
35     def test_query_ns_job(self):
36         JobModel(jobid=self.job_id,
37                  jobtype=JOB_TYPE.NS,
38                  jobaction=JOB_ACTION.INSTANTIATE,
39                  resid='1').save()
40         JobStatusModel(indexid=1,
41                        jobid=self.job_id,
42                        status=JOB_STATUS.PROCESSING,
43                        progress=20,
44                        descp='Finish to instantiate NS.',
45                        errcode="0").save()
46         response = self.client.get(JOB_INSTANCE_URI % self.job_id)
47         self.assertEqual(status.HTTP_200_OK, response.status_code, response.data)
48         self.assertIn('jobId', response.data)
49         self.assertIn('responseDescriptor', response.data)
50         self.assertEqual(20, response.data['responseDescriptor']['progress'])
51
52     def test_query_ns_job_not_existed(self):
53         job_id = 'test_job_id_not_existed'
54         response = self.client.get(JOB_INSTANCE_URI % job_id)
55         self.assertEqual(status.HTTP_200_OK, response.status_code)
56         self.assertIn('jobId', response.data)
57         self.assertNotIn('responseDescriptor', response.data)
58
59     def test_query_ns_job_with_error_response_id(self):
60         url = JOB_INSTANCE_URI % self.job_id + "?responseId='zzz'"
61         response = self.client.get(url)
62         self.assertEqual(status.HTTP_500_INTERNAL_SERVER_ERROR, response.status_code)
63
64     def test_query_job_with_response_id(self):
65         JobModel(jobid=self.job_id,
66                  jobtype=JOB_TYPE.NS,
67                  jobaction=JOB_ACTION.INSTANTIATE,
68                  resid='1').save()
69         JobStatusModel(indexid=1,
70                        jobid=self.job_id,
71                        status=JOB_STATUS.PROCESSING,
72                        progress=20,
73                        descp='NS instantiation progress is 20%.',
74                        errcode="0").save()
75         JobStatusModel(indexid=2,
76                        jobid=self.job_id,
77                        status=JOB_STATUS.PROCESSING,
78                        progress=50,
79                        descp='NS instantiation progress is 50%.',
80                        errcode="0").save()
81         JobStatusModel(indexid=3,
82                        jobid=self.job_id,
83                        status=JOB_STATUS.PROCESSING,
84                        progress=80,
85                        descp='NS instantiation progress is 80%.',
86                        errcode="0").save()
87         JobStatusModel(indexid=4,
88                        jobid=self.job_id,
89                        status=JOB_STATUS.FINISH,
90                        progress=100,
91                        descp='Finish to instantiate NS.',
92                        errcode="0").save()
93         url = JOB_INSTANCE_URI % self.job_id + "?responseId=2"
94         response = self.client.get(url)
95         self.assertEqual(status.HTTP_200_OK, response.status_code)
96         self.assertEqual(self.job_id, response.data.get('jobId'))
97         self.assertIn('responseDescriptor', response.data)
98         self.assertEqual(100, response.data['responseDescriptor']['progress'])
99         self.assertEqual(1, len(response.data['responseDescriptor']['responseHistoryList']))
100
101     def test_update_job(self):
102         JobModel(
103             jobid=self.job_id,
104             jobtype=JOB_TYPE.NS,
105             jobaction=JOB_ACTION.INSTANTIATE,
106             resid='1').save()
107         JobStatusModel(
108             indexid=1,
109             jobid=self.job_id,
110             status=JOB_STATUS.PROCESSING,
111             progress=20,
112             descp='NS instantiation progress is 20%.',
113             errcode="0").save()
114         response = self.client.post(JOB_INSTANCE_URI % self.job_id, format='json', data=UPDATE_JOB_DICT)
115         self.assertEqual(status.HTTP_202_ACCEPTED, response.status_code)
116
117     def test_update_job_not_existed(self):
118         response = self.client.post(JOB_INSTANCE_URI % self.job_id, format='json', data=UPDATE_JOB_DICT)
119         self.assertEqual(status.HTTP_400_BAD_REQUEST, response.status_code)
120
121     def test_update_job_with_bad_req(self):
122         JobModel(
123             jobid=self.job_id,
124             jobtype=JOB_TYPE.NS,
125             jobaction=JOB_ACTION.INSTANTIATE,
126             resid='1').save()
127         JobStatusModel(
128             indexid=1,
129             jobid=self.job_id,
130             status=JOB_STATUS.PROCESSING,
131             progress=20,
132             descp='NS instantiation progress is 20%.',
133             errcode="0").save()
134         response = self.client.post(JOB_INSTANCE_URI % self.job_id, format='json', data=UPDATE_JOB_BAD_REQ_DICT)
135         self.assertEqual(status.HTTP_400_BAD_REQUEST, response.status_code)