Implant vid-app-common org.onap.vid.job (main and test)
[vid.git] / vid-app-common / src / test / java / org / onap / vid / services / JobsBrokerServiceTest.java
index 85cf23e..40546e9 100644 (file)
@@ -7,9 +7,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 package org.onap.vid.services;
 
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.stream.Collectors.toList;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.both;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.onap.vid.job.Job.JobStatus.COMPLETED;
-import static org.onap.vid.job.Job.JobStatus.CREATING;
-import static org.onap.vid.job.Job.JobStatus.FAILED;
-import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
-import static org.onap.vid.job.Job.JobStatus.PAUSE;
-import static org.onap.vid.job.Job.JobStatus.PENDING;
-import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
-import static org.onap.vid.job.Job.JobStatus.STOPPED;
-import static org.onap.vid.utils.Streams.not;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.AssertJUnit.assertEquals;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import java.lang.reflect.Method;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-import javax.inject.Inject;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.hibernate.SessionFactory;
+import org.jetbrains.annotations.NotNull;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.onap.portalsdk.core.domain.support.DomainVo;
 import org.onap.portalsdk.core.service.DataAccessService;
 import org.onap.portalsdk.core.util.SystemProperties;
-import org.onap.vid.config.DataSourceConfig;
-import org.onap.vid.config.JobAdapterConfig;
 import org.onap.vid.exceptions.GenericUncheckedException;
 import org.onap.vid.exceptions.OperationNotAllowedException;
 import org.onap.vid.job.Job;
@@ -80,8 +45,12 @@ import org.onap.vid.job.JobType;
 import org.onap.vid.job.JobsBrokerService;
 import org.onap.vid.job.command.JobCommandFactoryTest;
 import org.onap.vid.job.impl.JobDaoImpl;
+import org.onap.vid.job.impl.JobSchedulerInitializer;
 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
+import org.onap.vid.services.VersionService;
 import org.onap.vid.utils.DaoUtils;
+import org.onap.vid.config.DataSourceConfig;
+import org.onap.vid.config.JobAdapterConfig;
 import org.springframework.test.context.ContextConfiguration;
 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
 import org.testng.Assert;
@@ -90,6 +59,31 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import javax.inject.Inject;
+import java.lang.reflect.Method;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.mockito.Mockito.when;
+import static org.onap.vid.job.Job.JobStatus.*;
+import static org.onap.vid.utils.Streams.not;
+import static org.onap.vid.testUtils.TestUtils.generateRandomAlphaNumeric;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+
 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
 
@@ -101,7 +95,7 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
 
     private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
 
-    private final long FEW = 1000;
+    private final long FEW = 500;
 
     private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
     private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
@@ -116,6 +110,23 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
     @Inject
     private SessionFactory sessionFactory;
 
+    @Mock
+    private VersionService versionService;
+
+    @AfterMethod
+    public void threadsCounter() {
+        logger.info("participating threads count: " + threadsIds.size());
+        threadsIds.clear();
+    }
+
+    @BeforeMethod
+    public void initializeBroker() {
+        MockitoAnnotations.initMocks(this);
+        when(versionService.retrieveBuildNumber()).thenReturn("aBuildNumber");
+        broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0, versionService);
+        ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
+    }
+
     /*
     - pulling jobs is limited to inserted ones
     - putting back allows getting the job again
@@ -147,6 +158,7 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
                 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
                 UUID.randomUUID(),
                 userId,
+                null,
                 "optimisticUniqueServiceInstanceName",
                 RandomUtils.nextInt());
     }
@@ -234,18 +246,6 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
         threadsIds.add(Thread.currentThread().getId());
     }
 
-    @AfterMethod
-    public void threadsCounter() {
-        logger.info("participating threads count: " + threadsIds.size());
-        threadsIds.clear();
-    }
-
-    @BeforeMethod
-    public void initializeBroker() {
-        broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0);
-        ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
-    }
-
     @Test
     public void givenSingleJob_getIt_verifySameJob() {
         final Job originalJob = waitForFutureJob(newJobAsync(broker));
@@ -254,6 +254,51 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
     }
 
+    @DataProvider
+    public static Object[][] allTopics() {
+        return JobSchedulerInitializer.WORKERS_TOPICS.stream()
+                .map(topic -> new Object[] { topic })
+                .toArray(Object[][]::new);
+    }
+
+    @Test(dataProvider = "allTopics")
+    public void givenJobFromSameBuild_pullJobs_jobIsPulled(Job.JobStatus topic) {
+        when(versionService.retrieveBuildNumber()).thenReturn("someVersion");
+        Job mockedJob = createMockJob("user id", topic);
+        UUID uuid = broker.add(mockedJob);
+        assertEquals(uuid,  broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
+    }
+
+
+    @Test(dataProvider = "allTopics")
+    public void givenJobFromOtherBuild_pullJobs_noneIsPulled(Job.JobStatus topic) {
+        when(versionService.retrieveBuildNumber()).thenReturn("old");
+        Job mockedJob = createMockJob("user id", topic);
+        broker.add(mockedJob);
+        when(versionService.retrieveBuildNumber()).thenReturn("new");
+        assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
+    }
+
+    @Test
+    public void givenJobWithNullBuildAndJobWithRealBuild_pullJobs_jobsWithNonNullIsPulled() {
+        Job.JobStatus topic = PENDING;
+
+        //push job with null build
+        when(versionService.retrieveBuildNumber()).thenReturn(null);
+        broker.add(createMockJob("user id", topic));
+
+        //push job with "aBuild" build
+        when(versionService.retrieveBuildNumber()).thenReturn("aBuild");
+        UUID newJobId = broker.add(createMockJob("user id", topic));
+
+        //pull jobs while current build is still "aBuild". Only the non null build is pulled
+        assertEquals(newJobId,  broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
+
+        //no more jobs to pull
+        assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
+    }
+
+
     @Test
     public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
         final List<Job> originalJobs = putALotOfJobs(broker);
@@ -346,10 +391,10 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
-                  6,
-                  5,
-                  PENDING,
-                  "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
+                        6,
+                        5,
+                        PENDING,
+                        "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
                 },
                 {ImmutableList.of(
                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
@@ -374,7 +419,7 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
                         () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
-                        ),
+                ),
                         3,
                         2,
                         PENDING,
@@ -411,10 +456,10 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
                         () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
                         () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
-                   3,
-                   2,
-                   PENDING,
-                   "Broker should pull pending job when there is another job from this template that was deleted, although failed"
+                        3,
+                        2,
+                        PENDING,
+                        "Broker should pull pending job when there is another job from this template that was deleted, although failed"
                 },
                 { ImmutableList.of(
                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
@@ -475,10 +520,10 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
                         () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
                 ),
-                  20,
-                  6,
-                  IN_PROGRESS,
-                  "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
+                        20,
+                        6,
+                        IN_PROGRESS,
+                        "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
                 },
                 {ImmutableList.of(
                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
@@ -541,14 +586,9 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
 
     @Test(dataProvider = "jobs")
     public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
-        JobsBrokerServiceInDatabaseImpl broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20);
-        final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
-        for (JobDaoImpl job : jobs) {
-            Date modifiedDate = job.getModified();
-            broker.add(job);
-            setModifiedDateToJob(job.getUuid(), modifiedDate);
-        }
-        Optional<Job> nextJob = broker.pull(topic, UUID.randomUUID().toString());
+        JobsBrokerServiceInDatabaseImpl aBroker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20, versionService);
+        final List<JobDaoImpl> jobs = addJobsWithModifiedDate(jobbers, aBroker);
+        Optional<Job> nextJob = aBroker.pull(topic, UUID.randomUUID().toString());
         boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
         String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
         Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
@@ -557,10 +597,206 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
         }
     }
 
+    @NotNull
+    protected List<JobDaoImpl> addJobsWithModifiedDate(List<Jobber> jobbers, JobsBrokerService broker) {
+        final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
+        return addJobsWithModifiedDateByJobDao(jobs, broker);
+    }
+
+    @NotNull
+    private List<JobDaoImpl> addJobsWithModifiedDateByJobDao(List<JobDaoImpl> jobs, JobsBrokerService broker) {
+        for (JobDaoImpl job : jobs) {
+            Date modifiedDate = job.getModified();
+            broker.add(job);
+            setModifiedDateToJob(job.getUuid(), modifiedDate);
+        }
+        return jobs;
+    }
+
+    @DataProvider
+    public static Object[][] jobsForTestingPendingResource(Method test) {
+        UUID templateId1 = UUID.fromString("311a9196-bbc5-47a1-8b11-bf0f9db1c7ca");
+        UUID templateId2 = UUID.fromString("4f1522f9-642e-49f7-af75-a2f344085bcc");
+        return new Object[][]{
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(12, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(1, templateId2, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(2, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(3, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false)
+                ),
+                        0,
+                        "given there is only one in the queue in PENDING_RESOURCE and no other job with same templateId, then this job is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(3, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(2), false),
+                        () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
+                ),
+                        2,
+                        "given multiple jobs with same templateId in PENDING_RESOURCE, then job with lowest indexInBulk is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(1, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
+                ),
+                        1,
+                        "given multiple jobs with same indexInBulk, then job with lowest templateId is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
+                        () -> createNewJob(2, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false)
+                ),
+                        0,
+                        "given multiple jobs with different indexInBulk and different templateId, then job with lowest indexInBulk is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
+                        () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false)
+                ),
+                        -1,
+                        "given there is already taken job with same templateId, then no job is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
+                        () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false),
+                        () -> createNewJob(9, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
+                        () -> createNewJob(8, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
+                ),
+                        3,
+                        "given 4 jobs, 2 jobs templateId1 but one of them is taken, and 2 jobs with templateId2, then select job with templateId2"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
+                        () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), true)
+                ),
+                        0,
+                        "given 2 jobs with same templateId, one of them is taken but deleted, then the other job is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
+                        () -> createNewJob(1, templateId1, "userId", IN_PROGRESS, null, LocalDateTime.now(), false)
+                ),
+                        -1,
+                        "given 2 jobs with same templateId, one of them is IN_PROGRESS, then no job is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
+                        () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false)
+                ),
+                        -1,
+                        "given 2 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS, then no job is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
+                        () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
+                        () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), true)
+                ),
+                        1,
+                        "given 3 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS but deleted, then other job with lowest indexInBulk is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
+                        () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
+                        () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false),
+                        () -> createNewJob(12, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
+                        () -> createNewJob(11, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
+                ),
+                        4,
+                        "given 5 jobs, 3 with templateId1 that one of them is RESOURCE_IN_PROGRESS,"+
+                                "2 with templateId2 both in PENDING_RESOURCE, then job with lowest indexInBulk from templateId2 is selected"
+
+                },
+                {ImmutableList.of( (Jobber)
+                        () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), true)
+                ),
+                        -1,
+                        "given 1 job in PENDING_RESOURCE but it's deleted, then no job is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(20, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(1, templateId1, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(2, templateId1, "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(3, templateId1, "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(4, templateId1, "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(5, templateId1, "userId", STOPPED, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(6, templateId1, "userId", PAUSE, null, LocalDateTime.now().minusSeconds(1), false)
+                ),
+                        0,
+                        "given multiple jobs with same templateId, 1 in PENDING_RESOURCE, and other are not in progress, "+
+                                "then the job in PENDING_RESOURCE is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                                () -> createNewJob(1, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(2, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(3, UUID.randomUUID(), "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(5, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
+                        () -> createNewJob(6, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false)
+                ),
+                        -1,
+                        "given there is no job in PENDING_RESOURCE state, then no job is selected"
+                },
+                {ImmutableList.of( (Jobber)
+                        () -> createNewJob(6, null, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false)
+                ),
+                        -1,
+                        "given there is 1 job in PENDING_RESOURCE but without templateId, then no job is selected"
+                },
+        };
+    }
+
+    @Test(dataProvider = "jobsForTestingPendingResource")
+    public void givenSomeJobs_pullPendingResource_returnNextOrNothingAsExpected(List<Jobber> jobbers, int expectedIndexSelected, String assertionReason) {
+        givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(jobbers, 1, expectedIndexSelected, PENDING_RESOURCE, assertionReason);
+    }
+
+    public static JobDaoImpl createNewJob(Job.JobStatus status, String takenBy, long secondsOffset, boolean deleted) {
+        return createNewJob(1, UUID.randomUUID(), "af456", status, takenBy, LocalDateTime.now().minusSeconds(secondsOffset), deleted);
+    }
+
+    @Test
+    public void givenSomeJobs_deleteOldFinalJobs_onlyExpectedJobsAreDeleted() {
+        long seconds = 999;
+        final List<Pair<JobDaoImpl,Boolean>> jobs = ImmutableList.of(
+                //not final
+                Pair.of(createNewJob(IN_PROGRESS, null, seconds+1, false), true),
+                Pair.of(createNewJob(RESOURCE_IN_PROGRESS, null, seconds+1, false), true),
+                Pair.of(createNewJob(PENDING, null, seconds+1, false), true),
+                Pair.of(createNewJob(CREATING, null, seconds+1, false), true),
+                Pair.of(createNewJob(PENDING_RESOURCE, null, seconds+1, false), true),
+                Pair.of(createNewJob(PAUSE, null, seconds+1, false), true),
+
+                //final
+                Pair.of(createNewJob(COMPLETED, null, seconds+1, false), false),
+                Pair.of(createNewJob(FAILED, null, seconds+1, false), false),
+                Pair.of(createNewJob(STOPPED, null, seconds+1, false), false),
+                Pair.of(createNewJob(COMPLETED_WITH_ERRORS, null, seconds+1, true), false),
+                Pair.of(createNewJob(COMPLETED_WITH_NO_ACTION, generateRandomAlphaNumeric(5), seconds+1, true), false),
+
+                //final but not old
+                Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-2, false), true),
+                Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-400, false), true),
+                Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), 0, false), true)
+        );
+        addJobsWithModifiedDateByJobDao(jobs.stream().map(Pair::getLeft).collect(Collectors.toList()), broker);
+        assertEquals(jobs.size(), broker.peek().size());
+
+        broker.deleteOldFinalJobs(seconds);
+        Stream<Pair<UUID, Job.JobStatus>> expectedJobs = jobs.stream()
+                .filter(Pair::getRight)
+                .map(x -> Pair.of(
+                        x.getLeft().getUuid(),
+                        x.getLeft().getStatus()
+                ));
+        assertThat(broker.peek().stream().map(x->Pair.of(x.getUuid(), x.getStatus())).collect(Collectors.toList()),
+                containsInAnyOrder(expectedJobs.toArray()));
+    }
+
     @DataProvider
     public Object[][] topics() {
         return Arrays.stream(Job.JobStatus.values())
-                .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS).contains(t)))
+                .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS, PENDING_RESOURCE).contains(t)))
                 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
     }
 
@@ -660,7 +896,7 @@ public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
     }
 
     @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
-       public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
+    public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
         final Job job = waitForFutureJob(newJobAsync(broker, status));
         broker.delete(job.getUuid());
         assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");