1 # Copyright 2016 ZTE Corporation.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from django.test import TestCase
16 from lcm.jobs.const import JOB_INSTANCE_URI
17 from lcm.jobs.enum import JOB_ACTION, JOB_STATUS, JOB_TYPE
18 from lcm.jobs.tests import UPDATE_JOB_DICT, UPDATE_JOB_BAD_REQ_DICT
19 from lcm.pub.database.models import JobModel, JobStatusModel
20 from rest_framework import status
21 from rest_framework.test import APIClient
24 class JobsViewTest(TestCase):
26 self.job_id = 'test_job_id'
27 self.client = APIClient()
28 JobModel.objects.all().delete()
29 JobStatusModel.objects.all().delete()
32 JobModel.objects.all().delete()
33 JobStatusModel.objects.all().delete()
35 def test_query_ns_job(self):
36 JobModel(jobid=self.job_id,
38 jobaction=JOB_ACTION.INSTANTIATE,
40 JobStatusModel(indexid=1,
42 status=JOB_STATUS.PROCESSING,
44 descp='Finish to instantiate NS.',
46 response = self.client.get(JOB_INSTANCE_URI % self.job_id)
47 self.assertEqual(status.HTTP_200_OK, response.status_code, response.data)
48 self.assertIn('jobId', response.data)
49 self.assertIn('responseDescriptor', response.data)
50 self.assertEqual(20, response.data['responseDescriptor']['progress'])
52 def test_query_ns_job_not_existed(self):
53 job_id = 'test_job_id_not_existed'
54 response = self.client.get(JOB_INSTANCE_URI % job_id)
55 self.assertEqual(status.HTTP_200_OK, response.status_code)
56 self.assertIn('jobId', response.data)
57 self.assertNotIn('responseDescriptor', response.data)
59 def test_query_ns_job_with_error_response_id(self):
60 url = JOB_INSTANCE_URI % self.job_id + "?responseId='zzz'"
61 response = self.client.get(url)
62 self.assertEqual(status.HTTP_500_INTERNAL_SERVER_ERROR, response.status_code)
64 def test_query_job_with_response_id(self):
65 JobModel(jobid=self.job_id,
67 jobaction=JOB_ACTION.INSTANTIATE,
69 JobStatusModel(indexid=1,
71 status=JOB_STATUS.PROCESSING,
73 descp='NS instantiation progress is 20%.',
75 JobStatusModel(indexid=2,
77 status=JOB_STATUS.PROCESSING,
79 descp='NS instantiation progress is 50%.',
81 JobStatusModel(indexid=3,
83 status=JOB_STATUS.PROCESSING,
85 descp='NS instantiation progress is 80%.',
87 JobStatusModel(indexid=4,
89 status=JOB_STATUS.FINISH,
91 descp='Finish to instantiate NS.',
93 url = JOB_INSTANCE_URI % self.job_id + "?responseId=2"
94 response = self.client.get(url)
95 self.assertEqual(status.HTTP_200_OK, response.status_code)
96 self.assertEqual(self.job_id, response.data.get('jobId'))
97 self.assertIn('responseDescriptor', response.data)
98 self.assertEqual(100, response.data['responseDescriptor']['progress'])
99 self.assertEqual(1, len(response.data['responseDescriptor']['responseHistoryList']))
101 def test_update_job(self):
105 jobaction=JOB_ACTION.INSTANTIATE,
110 status=JOB_STATUS.PROCESSING,
112 descp='NS instantiation progress is 20%.',
114 response = self.client.post(JOB_INSTANCE_URI % self.job_id, format='json', data=UPDATE_JOB_DICT)
115 self.assertEqual(status.HTTP_202_ACCEPTED, response.status_code)
117 def test_update_job_not_existed(self):
118 response = self.client.post(JOB_INSTANCE_URI % self.job_id, format='json', data=UPDATE_JOB_DICT)
119 self.assertEqual(status.HTTP_400_BAD_REQUEST, response.status_code)
121 def test_update_job_with_bad_req(self):
125 jobaction=JOB_ACTION.INSTANTIATE,
130 status=JOB_STATUS.PROCESSING,
132 descp='NS instantiation progress is 20%.',
134 response = self.client.post(JOB_INSTANCE_URI % self.job_id, format='json', data=UPDATE_JOB_BAD_REQ_DICT)
135 self.assertEqual(status.HTTP_400_BAD_REQUEST, response.status_code)