Extend timeout in DB-related tests of JobsBrokerService
[vid.git] / vid-app-common / src / test / java / org / onap / vid / services / JobsBrokerServiceTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.services;
22
23
24 import static java.util.concurrent.TimeUnit.MILLISECONDS;
25 import static java.util.stream.Collectors.toList;
26 import static org.hamcrest.CoreMatchers.equalTo;
27 import static org.hamcrest.CoreMatchers.is;
28 import static org.hamcrest.MatcherAssert.assertThat;
29 import static org.hamcrest.Matchers.both;
30 import static org.hamcrest.Matchers.containsInAnyOrder;
31 import static org.mockito.Mockito.when;
32 import static org.onap.vid.job.Job.JobStatus.COMPLETED;
33 import static org.onap.vid.job.Job.JobStatus.COMPLETED_WITH_ERRORS;
34 import static org.onap.vid.job.Job.JobStatus.COMPLETED_WITH_NO_ACTION;
35 import static org.onap.vid.job.Job.JobStatus.CREATING;
36 import static org.onap.vid.job.Job.JobStatus.FAILED;
37 import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
38 import static org.onap.vid.job.Job.JobStatus.PAUSE;
39 import static org.onap.vid.job.Job.JobStatus.PENDING;
40 import static org.onap.vid.job.Job.JobStatus.PENDING_RESOURCE;
41 import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
42 import static org.onap.vid.job.Job.JobStatus.STOPPED;
43 import static org.onap.vid.testUtils.TestUtils.generateRandomAlphaNumeric;
44 import static org.onap.vid.utils.Streams.not;
45 import static org.testng.Assert.assertNotNull;
46 import static org.testng.AssertJUnit.assertEquals;
47 import static org.testng.AssertJUnit.assertFalse;
48
49 import com.google.common.collect.ImmutableList;
50 import com.google.common.collect.ImmutableMap;
51 import java.lang.reflect.Method;
52 import java.time.LocalDateTime;
53 import java.time.ZoneId;
54 import java.util.Arrays;
55 import java.util.Date;
56 import java.util.List;
57 import java.util.Optional;
58 import java.util.Set;
59 import java.util.UUID;
60 import java.util.concurrent.ConcurrentSkipListSet;
61 import java.util.concurrent.ExecutionException;
62 import java.util.concurrent.ExecutorService;
63 import java.util.concurrent.Executors;
64 import java.util.concurrent.Future;
65 import java.util.concurrent.TimeoutException;
66 import java.util.stream.Collectors;
67 import java.util.stream.IntStream;
68 import java.util.stream.Stream;
69 import javax.inject.Inject;
70 import org.apache.commons.lang.RandomStringUtils;
71 import org.apache.commons.lang3.RandomUtils;
72 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
73 import org.apache.commons.lang3.builder.ToStringStyle;
74 import org.apache.commons.lang3.tuple.Pair;
75 import org.apache.log4j.LogManager;
76 import org.apache.log4j.Logger;
77 import org.hibernate.SessionFactory;
78 import org.jetbrains.annotations.NotNull;
79 import org.mockito.Mock;
80 import org.mockito.MockitoAnnotations;
81 import org.onap.portalsdk.core.domain.support.DomainVo;
82 import org.onap.portalsdk.core.service.DataAccessService;
83 import org.onap.portalsdk.core.util.SystemProperties;
84 import org.onap.vid.config.DataSourceConfig;
85 import org.onap.vid.config.JobAdapterConfig;
86 import org.onap.vid.exceptions.GenericUncheckedException;
87 import org.onap.vid.exceptions.OperationNotAllowedException;
88 import org.onap.vid.job.Job;
89 import org.onap.vid.job.JobAdapter;
90 import org.onap.vid.job.JobAdapter.AsyncJobRequest;
91 import org.onap.vid.job.JobType;
92 import org.onap.vid.job.JobsBrokerService;
93 import org.onap.vid.job.command.JobCommandFactoryTest;
94 import org.onap.vid.job.impl.JobDaoImpl;
95 import org.onap.vid.job.impl.JobSchedulerInitializer;
96 import org.onap.vid.job.impl.JobSharedData;
97 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
98 import org.onap.vid.utils.DaoUtils;
99 import org.springframework.test.context.ContextConfiguration;
100 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
101 import org.testng.Assert;
102 import org.testng.annotations.AfterMethod;
103 import org.testng.annotations.BeforeMethod;
104 import org.testng.annotations.DataProvider;
105 import org.testng.annotations.Test;
106
107 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
108 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
109
110     private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
111
112     private static final int JOBS_COUNT = 127;
113     private static final boolean DELETED = true;
114     private final ExecutorService executor = Executors.newFixedThreadPool(90);
115
116     private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
117
118     private final long FEW = 1000;
119     private final long SOME = 2000;
120
121     private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
122     private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
123     private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
124     private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
125     private JobsBrokerService broker;
126
127     @Inject
128     JobAdapter jobAdapter;
129     @Inject
130     private DataAccessService dataAccessService;
131     @Inject
132     private SessionFactory sessionFactory;
133
134     @Mock
135     private VersionService versionService;
136
137     @AfterMethod
138     public void threadsCounter() {
139         logger.info("participating threads count: " + threadsIds.size());
140         threadsIds.clear();
141     }
142
143     @BeforeMethod
144     public void initializeBroker() {
145         MockitoAnnotations.initMocks(this);
146         when(versionService.retrieveBuildNumber()).thenReturn("aBuildNumber");
147         broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0, versionService);
148         ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
149     }
150
151     /*
152     - pulling jobs is limited to inserted ones
153     - putting back allows getting the job again
154     - multi threads safety
155     - any added job should be visible with view
156
157     - edges:
158         - pulling with empty repo should return empty optional
159         - pulling more than expected should return empty optional
160         - putting one, over-pulling from a different thread
161         - take before inserting, then insert while waiting
162
163      */
164
165     private class NoJobException extends RuntimeException {
166     }
167
168     private Future<Job> newJobAsync(JobsBrokerService b) {
169         return newJobAsync(b, createMockJob("user id"));
170     }
171
172     private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
173         return newJobAsync(b, createMockJob("user id", status));
174     }
175
176     private Job createMockJob(String userId) {
177         return jobAdapter.createServiceInstantiationJob(
178                 JobType.NoOp,
179                 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
180                 UUID.randomUUID(),
181                 userId,
182                 null,
183                 "optimisticUniqueServiceInstanceName",
184                 RandomUtils.nextInt());
185     }
186
187     private Job createMockJob(String userId, Job.JobStatus jobStatus) {
188         Job job = createMockJob(userId);
189         job.setStatus(jobStatus);
190         return job;
191     }
192
193     private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
194         final Future<Job> jobFuture = executor.submit(() -> {
195             accountThreadId();
196
197             b.add(job);
198
199             return job;
200         });
201         return jobFuture;
202     }
203
204     private void pushBackJobAsync(JobsBrokerService b, Job job) {
205         executor.submit(() -> {
206             accountThreadId();
207             b.pushBack(job);
208             return job;
209         });
210     }
211
212     private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
213         final Future<Optional<Job>> job = executor.submit(() -> {
214             accountThreadId();
215             // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
216             return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
217         });
218         return job;
219     }
220
221     private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
222         try {
223             return retrievedOptionalJobFuture.get(SOME, MILLISECONDS).orElseThrow(NoJobException::new);
224         } catch (TimeoutException | InterruptedException | ExecutionException e) {
225             throw new RuntimeException(e);
226         }
227     }
228
229     private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
230         try {
231             return retrievedJobFuture.get(SOME, MILLISECONDS);
232         } catch (TimeoutException | InterruptedException | ExecutionException e) {
233             throw new RuntimeException(e);
234         }
235     }
236
237     private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
238         final List<Job> originalJobs = putALotOfJobs(broker);
239         final List<Job> retrievedJobs = getAlotOfJobs(broker);
240
241         assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
242
243         return retrievedJobs;
244     }
245
246     private List<Job> putALotOfJobs(JobsBrokerService broker) {
247         int n = JOBS_COUNT;
248         return IntStream.range(0, n)
249                 .mapToObj(i -> newJobAsync(broker))
250                 .map(this::waitForFutureJob)
251                 .collect(toList());
252     }
253
254     private List<Job> getAlotOfJobs(JobsBrokerService broker) {
255         int n = JOBS_COUNT;
256         return IntStream.range(0, n)
257                 .mapToObj(i -> pullJobAsync(broker))
258                 .map(this::waitForFutureOptionalJob)
259                 .collect(toList());
260     }
261
262     private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
263         jobs.forEach(job -> pushBackJobAsync(broker, job));
264     }
265
266     private void accountThreadId() {
267         threadsIds.add(Thread.currentThread().getId());
268     }
269
270     @Test
271     public void givenSingleJob_getIt_verifySameJob() {
272         final Job originalJob = waitForFutureJob(newJobAsync(broker));
273
274         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
275         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
276     }
277
278     @DataProvider
279     public static Object[][] allTopics() {
280         return JobSchedulerInitializer.WORKERS_TOPICS.stream()
281                 .map(topic -> new Object[] { topic })
282                 .toArray(Object[][]::new);
283     }
284
285     @Test(dataProvider = "allTopics")
286     public void givenJobFromSameBuild_pullJobs_jobIsPulled(Job.JobStatus topic) {
287         when(versionService.retrieveBuildNumber()).thenReturn("someVersion");
288         Job mockedJob = createMockJob("user id", topic);
289         UUID uuid = broker.add(mockedJob);
290         assertEquals(uuid,  broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
291     }
292
293
294     @Test(dataProvider = "allTopics")
295     public void givenJobFromOtherBuild_pullJobs_noneIsPulled(Job.JobStatus topic) {
296         when(versionService.retrieveBuildNumber()).thenReturn("old");
297         Job mockedJob = createMockJob("user id", topic);
298         broker.add(mockedJob);
299         when(versionService.retrieveBuildNumber()).thenReturn("new");
300         assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
301     }
302
303     @Test
304     public void givenJobWithNullBuildAndJobWithRealBuild_pullJobs_jobsWithNonNullIsPulled() {
305         Job.JobStatus topic = PENDING;
306
307         //push job with null build
308         when(versionService.retrieveBuildNumber()).thenReturn(null);
309         broker.add(createMockJob("user id", topic));
310
311         //push job with "aBuild" build
312         when(versionService.retrieveBuildNumber()).thenReturn("aBuild");
313         UUID newJobId = broker.add(createMockJob("user id", topic));
314
315         //pull jobs while current build is still "aBuild". Only the non null build is pulled
316         assertEquals(newJobId,  broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
317
318         //no more jobs to pull
319         assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
320     }
321
322
323     @Test
324     public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
325         final List<Job> originalJobs = putALotOfJobs(broker);
326
327         MILLISECONDS.sleep(FEW);
328         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
329
330         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
331
332         MILLISECONDS.sleep(FEW);
333         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
334
335         pushBackJobAsync(broker, retrievedJob);
336
337         MILLISECONDS.sleep(FEW);
338         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
339     }
340
341     @Test
342     public void givenManyJobs_getThemAll_verifySameJobs() {
343         putAndGetALotOfJobs(broker);
344     }
345
346     @Test
347     public void givenManyJobs_getThemAllThenPushBackAndGet_verifySameJobs() {
348         final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
349
350         pushBackJobs(retrievedJobs1, broker);
351         final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
352
353         assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
354     }
355
356     private static Date toDate(LocalDateTime localDateTime) {
357         return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
358     }
359
360     private void setModifiedDateToJob(UUID jobUuid, Date date) {
361         DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
362         job.setModified(date);
363         DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
364             session.saveOrUpdate(job);
365             return 1;
366         });
367     }
368
369
370     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
371         return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
372     }
373
374     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
375         JobDaoImpl job = new JobDaoImpl();
376         job.setUuid(UUID.randomUUID());
377         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
378         job.setIndexInBulk(indexInBulk);
379         job.setTemplateId(templateId);
380         job.setType(JobType.NoOp);
381         job.setStatus(status);
382         job.setTakenBy(takenBy);
383         job.setCreated(toDate(date));
384         job.setModified(toDate(date));
385         job.setUserId(userId);
386         if (deleted) {
387             job.setDeletedAt(new Date());
388         }
389         return job;
390     }
391
392     @DataProvider
393     public static Object[][] jobs(Method test) {
394         LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
395         UUID sameTemplate = UUID.randomUUID();
396         return new Object[][]{
397                 {ImmutableList.of(
398                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
399                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
400                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
401                         () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
402                         4,
403                         0,
404                         PENDING,
405                         "Broker should pull the first pending job by oldest date then by job index"
406                 },
407                 { ImmutableList.of(
408                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
409                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
410                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
411                         () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
412                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
413                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
414                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
415                         6,
416                         5,
417                         PENDING,
418                         "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
419                 },
420                 {ImmutableList.of(
421                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
422                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
423                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
424                         2,
425                         -1,
426                         PENDING,
427                         "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
428                 },
429                 {ImmutableList.of(
430                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
431                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
432                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
433                         2,
434                         -1,
435                         PENDING,
436                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
437                 },
438                 {ImmutableList.of(
439                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
440                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
441                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
442                         () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
443                 ),
444                         3,
445                         2,
446                         PENDING,
447                         "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
448                 },
449                 {ImmutableList.of(
450                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
451                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
452                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
453                         3,
454                         -1,
455                         PENDING,
456                         "Broker should not pull any job when there is another job from this template that was taken"
457                 },
458                 {ImmutableList.of(
459                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
460                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
461                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
462                         3,
463                         -1,
464                         PENDING,
465                         "Broker should not pull any job when there is another job from this template that in progress"
466                 },
467                 {ImmutableList.of(
468                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
469                         () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
470                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
471                         3,
472                         -1,
473                         PENDING,
474                         "Broker should not pull any job when there is another job from this template that was failed"
475                 },
476                 {ImmutableList.of(
477                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
478                         () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
479                         () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
480                         3,
481                         2,
482                         PENDING,
483                         "Broker should pull pending job when there is another job from this template that was deleted, although failed"
484                 },
485                 { ImmutableList.of(
486                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
487                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
488                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
489                         3,
490                         2,
491                         PENDING,
492                         "Broker should prioritize jobs of user that has no in-progress jobs"
493                 },
494                 {ImmutableList.of(
495                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
496                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
497                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
498                         3,
499                         2,
500                         PENDING,
501                         "Broker should prioritize jobs of user that has no taken jobs"
502                 },
503                 {ImmutableList.of(
504                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
505                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
506                         () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
507                         () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
508                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
509                         5,
510                         4,
511                         PENDING,
512                         "Broker should take oldest job when there is one in-progress job to each user"
513                 },
514                 {ImmutableList.of(
515                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
516                         () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
517                         () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
518                         2,
519                         -1,
520                         PENDING,
521                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
522                 },
523                 {ImmutableList.of(
524                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
525                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
526                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
527                         () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
528                         20,
529                         1,
530                         IN_PROGRESS,
531                         "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
532                 },
533                 {ImmutableList.of(
534                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
535                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
536                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
537                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
538                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
539                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
540                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
541                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
542                         () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
543                 ),
544                         20,
545                         6,
546                         IN_PROGRESS,
547                         "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
548                 },
549                 {ImmutableList.of(
550                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
551                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
552                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
553                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
554                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
555                         () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
556                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
557                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
558                         () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
559                 ),
560                         20,
561                         6,
562                         RESOURCE_IN_PROGRESS,
563                         "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
564                 },
565                 {ImmutableList.of(
566                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
567                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
568                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
569                         20,
570                         -1,
571                         IN_PROGRESS,
572                         "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
573                 },
574                 {ImmutableList.of(
575                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
576                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
577                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
578                         20,
579                         -1,
580                         RESOURCE_IN_PROGRESS,
581                         "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
582                 },
583                 {ImmutableList.of(
584                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
585                         () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
586                         () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
587                         1,
588                         2,
589                         CREATING,
590                         "Broker with creating topic should pull oldest creating job and ignore mso limit"
591                 },
592                 {ImmutableList.of(
593                         (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
594                         1,
595                         0,
596                         CREATING,
597                         "Broker with CREATING topic should pull CREATING job that was just modified"
598                 }
599
600         };
601     }
602
603     public interface Jobber {
604         // Will defer LocalDateTime.now() to test's "real-time"
605         JobDaoImpl toJob();
606     }
607
608     @Test(dataProvider = "jobs")
609     public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
610         JobsBrokerServiceInDatabaseImpl aBroker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20, versionService);
611         final List<JobDaoImpl> jobs = addJobsWithModifiedDate(jobbers, aBroker);
612         Optional<Job> nextJob = aBroker.pull(topic, UUID.randomUUID().toString());
613         boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
614         String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
615         Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
616         if (shouldAnyBeSelected) {
617             Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
618         }
619     }
620
621     @NotNull
622     protected List<JobDaoImpl> addJobsWithModifiedDate(List<Jobber> jobbers, JobsBrokerService broker) {
623         final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
624         return addJobsWithModifiedDateByJobDao(jobs, broker);
625     }
626
627     @NotNull
628     private List<JobDaoImpl> addJobsWithModifiedDateByJobDao(List<JobDaoImpl> jobs, JobsBrokerService broker) {
629         for (JobDaoImpl job : jobs) {
630             Date modifiedDate = job.getModified();
631             broker.add(job);
632             setModifiedDateToJob(job.getUuid(), modifiedDate);
633         }
634         return jobs;
635     }
636
637     @DataProvider
638     public static Object[][] jobsForTestingPendingResource(Method test) {
639         UUID templateId1 = UUID.fromString("311a9196-bbc5-47a1-8b11-bf0f9db1c7ca");
640         UUID templateId2 = UUID.fromString("4f1522f9-642e-49f7-af75-a2f344085bcc");
641         return new Object[][]{
642                 {ImmutableList.of( (Jobber)
643                                 () -> createNewJob(12, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
644                         () -> createNewJob(1, templateId2, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
645                         () -> createNewJob(2, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
646                         () -> createNewJob(3, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
647                         () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false)
648                 ),
649                         0,
650                         "given there is only one in the queue in PENDING_RESOURCE and no other job with same templateId, then this job is selected"
651                 },
652                 {ImmutableList.of( (Jobber)
653                                 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
654                         () -> createNewJob(3, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(2), false),
655                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
656                 ),
657                         2,
658                         "given multiple jobs with same templateId in PENDING_RESOURCE, then job with lowest indexInBulk is selected"
659                 },
660                 {ImmutableList.of( (Jobber)
661                                 () -> createNewJob(1, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
662                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
663                 ),
664                         1,
665                         "given multiple jobs with same indexInBulk, then job with lowest templateId is selected"
666                 },
667                 {ImmutableList.of( (Jobber)
668                                 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
669                         () -> createNewJob(2, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false)
670                 ),
671                         0,
672                         "given multiple jobs with different indexInBulk and different templateId, then job with lowest indexInBulk is selected"
673                 },
674                 {ImmutableList.of( (Jobber)
675                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
676                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false)
677                 ),
678                         -1,
679                         "given there is already taken job with same templateId, then no job is selected"
680                 },
681                 {ImmutableList.of( (Jobber)
682                                 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
683                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false),
684                         () -> createNewJob(9, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
685                         () -> createNewJob(8, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
686                 ),
687                         3,
688                         "given 4 jobs, 2 jobs templateId1 but one of them is taken, and 2 jobs with templateId2, then select job with templateId2"
689                 },
690                 {ImmutableList.of( (Jobber)
691                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
692                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), true)
693                 ),
694                         0,
695                         "given 2 jobs with same templateId, one of them is taken but deleted, then the other job is selected"
696                 },
697                 {ImmutableList.of( (Jobber)
698                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
699                         () -> createNewJob(1, templateId1, "userId", IN_PROGRESS, null, LocalDateTime.now(), false)
700                 ),
701                         -1,
702                         "given 2 jobs with same templateId, one of them is IN_PROGRESS, then no job is selected"
703                 },
704                 {ImmutableList.of( (Jobber)
705                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
706                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false)
707                 ),
708                         -1,
709                         "given 2 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS, then no job is selected"
710                 },
711                 {ImmutableList.of( (Jobber)
712                                 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
713                         () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
714                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), true)
715                 ),
716                         1,
717                         "given 3 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS but deleted, then other job with lowest indexInBulk is selected"
718                 },
719                 {ImmutableList.of( (Jobber)
720                                 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
721                         () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
722                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false),
723                         () -> createNewJob(12, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
724                         () -> createNewJob(11, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
725                 ),
726                         4,
727                         "given 5 jobs, 3 with templateId1 that one of them is RESOURCE_IN_PROGRESS,"+
728                                 "2 with templateId2 both in PENDING_RESOURCE, then job with lowest indexInBulk from templateId2 is selected"
729
730                 },
731                 {ImmutableList.of( (Jobber)
732                         () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), true)
733                 ),
734                         -1,
735                         "given 1 job in PENDING_RESOURCE but it's deleted, then no job is selected"
736                 },
737                 {ImmutableList.of( (Jobber)
738                                 () -> createNewJob(20, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
739                         () -> createNewJob(1, templateId1, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
740                         () -> createNewJob(2, templateId1, "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
741                         () -> createNewJob(3, templateId1, "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
742                         () -> createNewJob(4, templateId1, "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
743                         () -> createNewJob(5, templateId1, "userId", STOPPED, null, LocalDateTime.now().minusSeconds(1), false),
744                         () -> createNewJob(6, templateId1, "userId", PAUSE, null, LocalDateTime.now().minusSeconds(1), false)
745                 ),
746                         0,
747                         "given multiple jobs with same templateId, 1 in PENDING_RESOURCE, and other are not in progress, "+
748                                 "then the job in PENDING_RESOURCE is selected"
749                 },
750                 {ImmutableList.of( (Jobber)
751                                 () -> createNewJob(1, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
752                         () -> createNewJob(2, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
753                         () -> createNewJob(3, UUID.randomUUID(), "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
754                         () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
755                         () -> createNewJob(5, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
756                         () -> createNewJob(6, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false)
757                 ),
758                         -1,
759                         "given there is no job in PENDING_RESOURCE state, then no job is selected"
760                 },
761                 {ImmutableList.of( (Jobber)
762                         () -> createNewJob(6, null, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false)
763                 ),
764                         -1,
765                         "given there is 1 job in PENDING_RESOURCE but without templateId, then no job is selected"
766                 },
767         };
768     }
769
770     @Test(dataProvider = "jobsForTestingPendingResource")
771     public void givenSomeJobs_pullPendingResource_returnNextOrNothingAsExpected(List<Jobber> jobbers, int expectedIndexSelected, String assertionReason) {
772         givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(jobbers, 1, expectedIndexSelected, PENDING_RESOURCE, assertionReason);
773     }
774
775     public static JobDaoImpl createNewJob(Job.JobStatus status, String takenBy, long secondsOffset, boolean deleted) {
776         return createNewJob(1, UUID.randomUUID(), "af456", status, takenBy, LocalDateTime.now().minusSeconds(secondsOffset), deleted);
777     }
778
779     @Test
780     public void givenSomeJobs_deleteOldFinalJobs_onlyExpectedJobsAreDeleted() {
781         long seconds = 999;
782         final List<Pair<JobDaoImpl,Boolean>> jobs = ImmutableList.of(
783                 //not final
784                 Pair.of(createNewJob(IN_PROGRESS, null, seconds+1, false), true),
785                 Pair.of(createNewJob(RESOURCE_IN_PROGRESS, null, seconds+1, false), true),
786                 Pair.of(createNewJob(PENDING, null, seconds+1, false), true),
787                 Pair.of(createNewJob(CREATING, null, seconds+1, false), true),
788                 Pair.of(createNewJob(PENDING_RESOURCE, null, seconds+1, false), true),
789                 Pair.of(createNewJob(PAUSE, null, seconds+1, false), true),
790
791                 //final
792                 Pair.of(createNewJob(COMPLETED, null, seconds+1, false), false),
793                 Pair.of(createNewJob(FAILED, null, seconds+1, false), false),
794                 Pair.of(createNewJob(STOPPED, null, seconds+1, false), false),
795                 Pair.of(createNewJob(COMPLETED_WITH_ERRORS, null, seconds+1, true), false),
796                 Pair.of(createNewJob(COMPLETED_WITH_NO_ACTION, generateRandomAlphaNumeric(5), seconds+1, true), false),
797
798                 //final but not old
799                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-2, false), true),
800                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-400, false), true),
801                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), 0, false), true)
802         );
803         addJobsWithModifiedDateByJobDao(jobs.stream().map(Pair::getLeft).collect(Collectors.toList()), broker);
804         assertEquals(jobs.size(), broker.peek().size());
805
806         broker.deleteOldFinalJobs(seconds);
807         Stream<Pair<UUID, Job.JobStatus>> expectedJobs = jobs.stream()
808                 .filter(Pair::getRight)
809                 .map(x -> Pair.of(
810                         x.getLeft().getUuid(),
811                         x.getLeft().getStatus()
812                 ));
813         assertThat(broker.peek().stream().map(x->Pair.of(x.getUuid(), x.getStatus())).collect(Collectors.toList()),
814                 containsInAnyOrder(expectedJobs.toArray()));
815     }
816
817     @DataProvider
818     public Object[][] topics() {
819         return Arrays.stream(Job.JobStatus.values())
820                 .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS, PENDING_RESOURCE).contains(t)))
821                 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
822     }
823
824     @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
825     public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
826         broker.pull(topic, UUID.randomUUID().toString());
827     }
828
829     @Test(expectedExceptions = NoJobException.class)
830     public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
831         Stream.of(Job.JobStatus.values())
832                 .filter(not(s -> s.equals(PENDING)))
833                 .map(s -> createMockJob("some user id", s))
834                 .map(job -> newJobAsync(broker, job))
835                 .map(this::waitForFutureJob)
836                 .collect(toList());
837
838         waitForFutureOptionalJob(pullJobAsync(broker));
839     }
840
841     @Test
842     public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
843         newJobAsync(broker); // this negated the expected result of the call below
844         givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
845     }
846
847     @Test(expectedExceptions = NoJobException.class)
848     public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
849         putAndGetALotOfJobs(broker);
850         waitForFutureOptionalJob(pullJobAsync(broker));
851     }
852
853     @Test(expectedExceptions = NoJobException.class)
854     public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
855         final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
856         assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
857         waitForFutureOptionalJob(futureOptionalJob);
858     }
859
860     @Test(expectedExceptions = IllegalStateException.class)
861     public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
862         waitForFutureJob(newJobAsync(broker));
863         waitForFutureJob(newJobAsync(broker));
864         waitForFutureOptionalJob(pullJobAsync(broker));
865
866         Job myJob = createMockJob("user id");
867         myJob.setUuid(UUID.randomUUID());
868
869         broker.pushBack(myJob); //Should fail
870     }
871
872     @Test
873     public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
874         final ImmutableMap<String, Object> randomDataForMostRecentJobType =
875                 ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
876
877         waitForFutureJob(newJobAsync(broker));
878         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
879
880         job.setStatus(Job.JobStatus.PENDING);
881         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
882         job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
883         job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
884
885         broker.pushBack(job);
886         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
887
888         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
889         assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
890         assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
891     }
892
893     private static String jobDataReflected(Job job) {
894         return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
895                 .setExcludeFieldNames("created", "modified", "takenBy")
896                 .toString();
897     }
898
899     @Test(expectedExceptions = IllegalStateException.class)
900     public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
901         waitForFutureJob(newJobAsync(broker));
902         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
903
904         broker.pushBack(job);
905         broker.pushBack(job); //Should fail
906     }
907
908     @Test
909     public void addJob_PeekItById_verifySameJobWasPeeked() {
910         String userId = UUID.randomUUID().toString();
911         Job myJob = createMockJob(userId);
912         UUID uuid = broker.add(myJob);
913         Job peekedJob = broker.peek(uuid);
914         assertEquals("added testId is not the same as peeked TestsId",
915                 userId,
916                 peekedJob.getSharedData().getUserId());
917     }
918
919     @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
920     public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
921         final Job job = waitForFutureJob(newJobAsync(broker, status));
922         broker.delete(job.getUuid());
923         assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
924         waitForFutureOptionalJob(pullJobAsync(broker));
925     }
926
927     @DataProvider
928     public static Object[][] jobStatusesForSuccessDelete() {
929         return new Object[][]{
930                 {PENDING},
931                 {STOPPED}
932         };
933     }
934
935     @Test(
936             dataProvider = "jobStatusesForFailedDelete",
937             expectedExceptions = OperationNotAllowedException.class,
938             expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
939     )
940     public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
941         final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
942
943         if (taken) {
944             waitForFutureOptionalJob(pullJobAsync(broker));
945         }
946
947
948         broker.delete(job.getUuid());
949     }
950
951     @DataProvider
952     public static Object[][] jobStatusesForFailedDelete() {
953         return new Object[][]{
954                 {PENDING, true},
955                 {IN_PROGRESS, false},
956                 {COMPLETED, false},
957                 {PAUSE, false},
958                 {FAILED, false},
959         };
960     }
961
962     @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
963     public void deleteJob_notExist_exceptionIsThrown() {
964         waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
965         broker.delete(new UUID(111, 111));
966     }
967
968     public static class MockAsyncRequest implements AsyncJobRequest {
969         public String value;
970
971         public MockAsyncRequest() {}
972
973         public MockAsyncRequest(String value) {
974             this.value = value;
975         }
976
977         public String getValue() {
978             return value;
979         }
980     }
981
982     @Test
983     public void twoJobsWithSamePosition_bothJobsArePulled(){
984         UUID uuid = UUID.randomUUID();
985         int positionInBulk = RandomUtils.nextInt();
986         String userId = "userId";
987
988         Optional<Job> firstPulledJob = createAddAndPullJob(uuid, positionInBulk, userId, "first value");
989         Optional<Job> secondPulledJob = createAddAndPullJob(uuid, positionInBulk, userId, "second value");
990
991         MockAsyncRequest firstValue = (MockAsyncRequest) firstPulledJob.get().getSharedData().getRequest();
992         MockAsyncRequest secondValue = (MockAsyncRequest) secondPulledJob.get().getSharedData().getRequest();
993         assertThat(ImmutableList.of(firstValue.value, secondValue.value),
994             containsInAnyOrder("first value", "second value"));
995     }
996
997     private Optional<Job> createAddAndPullJob(UUID uuid, int positionInBulk, String userId, String s) {
998         JobDaoImpl job1 = createNewJob(positionInBulk, uuid, userId, CREATING, null,
999             LocalDateTime.now().minusSeconds(1), false);
1000         job1.setSharedData(new JobSharedData(null, userId, new MockAsyncRequest(s), "testApi"));
1001         broker.add(job1);
1002         return broker.pull(CREATING, userId);
1003     }
1004 }