).save()
JobUtil.clear_job(job_id)
self.assertEqual(0, len(JobModel.objects.filter(jobid=job_id)))
+
+ def test_add_job_status_when_job_is_not_created(self):
+ JobModel.objects.filter().delete()
+ self.assertRaises(
+ Exception,
+ JobUtil.add_job_status,
+ job_id="1",
+ progress=1,
+ status_decs="2",
+ error_code="0"
+ )
+
+ def test_add_job_status_normal(self):
+ job_id = "1"
+ JobModel.objects.filter().delete()
+ JobStatusModel.objects.filter().delete()
+ JobModel(
+ jobid=job_id,
+ jobtype="1",
+ jobaction="2",
+ resid="3",
+ status=0
+ ).save()
+ JobUtil.add_job_status(
+ job_id="1",
+ progress=1,
+ status_decs="2",
+ error_code="0"
+ )
+ self.assertEqual(1, len(JobStatusModel.objects.filter(jobid=job_id)))
+ JobStatusModel.objects.filter().delete()
+ JobModel.objects.filter().delete()