+ @NotNull
+ protected List<JobDaoImpl> addJobsWithModifiedDate(List<Jobber> jobbers, JobsBrokerService broker) {
+ final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
+ return addJobsWithModifiedDateByJobDao(jobs, broker);
+ }
+
+ @NotNull
+ private List<JobDaoImpl> addJobsWithModifiedDateByJobDao(List<JobDaoImpl> jobs, JobsBrokerService broker) {
+ for (JobDaoImpl job : jobs) {
+ Date modifiedDate = job.getModified();
+ broker.add(job);
+ setModifiedDateToJob(job.getUuid(), modifiedDate);
+ }
+ return jobs;
+ }
+
+ @DataProvider
+ public static Object[][] jobsForTestingPendingResource(Method test) {
+ UUID templateId1 = UUID.fromString("311a9196-bbc5-47a1-8b11-bf0f9db1c7ca");
+ UUID templateId2 = UUID.fromString("4f1522f9-642e-49f7-af75-a2f344085bcc");
+ return new Object[][]{
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(12, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(1, templateId2, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(2, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(3, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false)
+ ),
+ 0,
+ "given there is only one in the queue in PENDING_RESOURCE and no other job with same templateId, then this job is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(3, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(2), false),
+ () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
+ ),
+ 2,
+ "given multiple jobs with same templateId in PENDING_RESOURCE, then job with lowest indexInBulk is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(1, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
+ ),
+ 1,
+ "given multiple jobs with same indexInBulk, then job with lowest templateId is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
+ () -> createNewJob(2, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false)
+ ),
+ 0,
+ "given multiple jobs with different indexInBulk and different templateId, then job with lowest indexInBulk is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
+ () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false)
+ ),
+ -1,
+ "given there is already taken job with same templateId, then no job is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
+ () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false),
+ () -> createNewJob(9, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
+ () -> createNewJob(8, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
+ ),
+ 3,
+ "given 4 jobs, 2 jobs templateId1 but one of them is taken, and 2 jobs with templateId2, then select job with templateId2"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
+ () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), true)
+ ),
+ 0,
+ "given 2 jobs with same templateId, one of them is taken but deleted, then the other job is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
+ () -> createNewJob(1, templateId1, "userId", IN_PROGRESS, null, LocalDateTime.now(), false)
+ ),
+ -1,
+ "given 2 jobs with same templateId, one of them is IN_PROGRESS, then no job is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
+ () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false)
+ ),
+ -1,
+ "given 2 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS, then no job is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
+ () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
+ () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), true)
+ ),
+ 1,
+ "given 3 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS but deleted, then other job with lowest indexInBulk is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
+ () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
+ () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false),
+ () -> createNewJob(12, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
+ () -> createNewJob(11, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
+ ),
+ 4,
+ "given 5 jobs, 3 with templateId1 that one of them is RESOURCE_IN_PROGRESS,"+
+ "2 with templateId2 both in PENDING_RESOURCE, then job with lowest indexInBulk from templateId2 is selected"
+
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), true)
+ ),
+ -1,
+ "given 1 job in PENDING_RESOURCE but it's deleted, then no job is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(20, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(1, templateId1, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(2, templateId1, "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(3, templateId1, "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(4, templateId1, "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(5, templateId1, "userId", STOPPED, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(6, templateId1, "userId", PAUSE, null, LocalDateTime.now().minusSeconds(1), false)
+ ),
+ 0,
+ "given multiple jobs with same templateId, 1 in PENDING_RESOURCE, and other are not in progress, "+
+ "then the job in PENDING_RESOURCE is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(1, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(2, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(3, UUID.randomUUID(), "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(5, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
+ () -> createNewJob(6, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false)
+ ),
+ -1,
+ "given there is no job in PENDING_RESOURCE state, then no job is selected"
+ },
+ {ImmutableList.of( (Jobber)
+ () -> createNewJob(6, null, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false)
+ ),
+ -1,
+ "given there is 1 job in PENDING_RESOURCE but without templateId, then no job is selected"
+ },
+ };
+ }
+
+ @Test(dataProvider = "jobsForTestingPendingResource")
+ public void givenSomeJobs_pullPendingResource_returnNextOrNothingAsExpected(List<Jobber> jobbers, int expectedIndexSelected, String assertionReason) {
+ givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(jobbers, 1, expectedIndexSelected, PENDING_RESOURCE, assertionReason);
+ }
+
+ public static JobDaoImpl createNewJob(Job.JobStatus status, String takenBy, long secondsOffset, boolean deleted) {
+ return createNewJob(1, UUID.randomUUID(), "af456", status, takenBy, LocalDateTime.now().minusSeconds(secondsOffset), deleted);
+ }
+
+ @Test
+ public void givenSomeJobs_deleteOldFinalJobs_onlyExpectedJobsAreDeleted() {
+ long seconds = 999;
+ final List<Pair<JobDaoImpl,Boolean>> jobs = ImmutableList.of(
+ //not final
+ Pair.of(createNewJob(IN_PROGRESS, null, seconds+1, false), true),
+ Pair.of(createNewJob(RESOURCE_IN_PROGRESS, null, seconds+1, false), true),
+ Pair.of(createNewJob(PENDING, null, seconds+1, false), true),
+ Pair.of(createNewJob(CREATING, null, seconds+1, false), true),
+ Pair.of(createNewJob(PENDING_RESOURCE, null, seconds+1, false), true),
+ Pair.of(createNewJob(PAUSE, null, seconds+1, false), true),
+
+ //final
+ Pair.of(createNewJob(COMPLETED, null, seconds+1, false), false),
+ Pair.of(createNewJob(FAILED, null, seconds+1, false), false),
+ Pair.of(createNewJob(STOPPED, null, seconds+1, false), false),
+ Pair.of(createNewJob(COMPLETED_WITH_ERRORS, null, seconds+1, true), false),
+ Pair.of(createNewJob(COMPLETED_WITH_NO_ACTION, generateRandomAlphaNumeric(5), seconds+1, true), false),
+
+ //final but not old
+ Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-2, false), true),
+ Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-400, false), true),
+ Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), 0, false), true)
+ );
+ addJobsWithModifiedDateByJobDao(jobs.stream().map(Pair::getLeft).collect(Collectors.toList()), broker);
+ assertEquals(jobs.size(), broker.peek().size());
+
+ broker.deleteOldFinalJobs(seconds);
+ Stream<Pair<UUID, Job.JobStatus>> expectedJobs = jobs.stream()
+ .filter(Pair::getRight)
+ .map(x -> Pair.of(
+ x.getLeft().getUuid(),
+ x.getLeft().getStatus()
+ ));
+ assertThat(broker.peek().stream().map(x->Pair.of(x.getUuid(), x.getStatus())).collect(Collectors.toList()),
+ containsInAnyOrder(expectedJobs.toArray()));
+ }
+