Extend timeout in test JobsBrokerService
[vid.git] / vid-app-common / src / test / java / org / onap / vid / services / JobsBrokerServiceTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.services;
22
23
24 import static java.util.concurrent.TimeUnit.MILLISECONDS;
25 import static java.util.stream.Collectors.toList;
26 import static org.hamcrest.CoreMatchers.equalTo;
27 import static org.hamcrest.CoreMatchers.is;
28 import static org.hamcrest.MatcherAssert.assertThat;
29 import static org.hamcrest.Matchers.both;
30 import static org.hamcrest.Matchers.containsInAnyOrder;
31 import static org.mockito.Mockito.when;
32 import static org.onap.vid.job.Job.JobStatus.COMPLETED;
33 import static org.onap.vid.job.Job.JobStatus.COMPLETED_WITH_ERRORS;
34 import static org.onap.vid.job.Job.JobStatus.COMPLETED_WITH_NO_ACTION;
35 import static org.onap.vid.job.Job.JobStatus.CREATING;
36 import static org.onap.vid.job.Job.JobStatus.FAILED;
37 import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
38 import static org.onap.vid.job.Job.JobStatus.PAUSE;
39 import static org.onap.vid.job.Job.JobStatus.PENDING;
40 import static org.onap.vid.job.Job.JobStatus.PENDING_RESOURCE;
41 import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
42 import static org.onap.vid.job.Job.JobStatus.STOPPED;
43 import static org.onap.vid.testUtils.TestUtils.generateRandomAlphaNumeric;
44 import static org.onap.vid.utils.Streams.not;
45 import static org.testng.Assert.assertNotNull;
46 import static org.testng.AssertJUnit.assertEquals;
47 import static org.testng.AssertJUnit.assertFalse;
48
49 import com.google.common.collect.ImmutableList;
50 import com.google.common.collect.ImmutableMap;
51 import java.lang.reflect.Method;
52 import java.time.LocalDateTime;
53 import java.time.ZoneId;
54 import java.util.Arrays;
55 import java.util.Date;
56 import java.util.List;
57 import java.util.Optional;
58 import java.util.Set;
59 import java.util.UUID;
60 import java.util.concurrent.ConcurrentSkipListSet;
61 import java.util.concurrent.ExecutionException;
62 import java.util.concurrent.ExecutorService;
63 import java.util.concurrent.Executors;
64 import java.util.concurrent.Future;
65 import java.util.concurrent.TimeoutException;
66 import java.util.stream.Collectors;
67 import java.util.stream.IntStream;
68 import java.util.stream.Stream;
69 import javax.inject.Inject;
70 import org.apache.commons.lang.RandomStringUtils;
71 import org.apache.commons.lang3.RandomUtils;
72 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
73 import org.apache.commons.lang3.builder.ToStringStyle;
74 import org.apache.commons.lang3.tuple.Pair;
75 import org.apache.log4j.LogManager;
76 import org.apache.log4j.Logger;
77 import org.hibernate.SessionFactory;
78 import org.jetbrains.annotations.NotNull;
79 import org.mockito.Mock;
80 import org.mockito.MockitoAnnotations;
81 import org.onap.portalsdk.core.domain.support.DomainVo;
82 import org.onap.portalsdk.core.service.DataAccessService;
83 import org.onap.portalsdk.core.util.SystemProperties;
84 import org.onap.vid.config.DataSourceConfig;
85 import org.onap.vid.config.JobAdapterConfig;
86 import org.onap.vid.exceptions.GenericUncheckedException;
87 import org.onap.vid.exceptions.OperationNotAllowedException;
88 import org.onap.vid.job.Job;
89 import org.onap.vid.job.JobAdapter;
90 import org.onap.vid.job.JobAdapter.AsyncJobRequest;
91 import org.onap.vid.job.JobType;
92 import org.onap.vid.job.JobsBrokerService;
93 import org.onap.vid.job.command.JobCommandFactoryTest;
94 import org.onap.vid.job.impl.JobDaoImpl;
95 import org.onap.vid.job.impl.JobSchedulerInitializer;
96 import org.onap.vid.job.impl.JobSharedData;
97 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
98 import org.onap.vid.utils.DaoUtils;
99 import org.springframework.test.context.ContextConfiguration;
100 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
101 import org.testng.Assert;
102 import org.testng.annotations.AfterMethod;
103 import org.testng.annotations.BeforeMethod;
104 import org.testng.annotations.DataProvider;
105 import org.testng.annotations.Test;
106
107 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
108 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
109
110     private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
111
112     private static final int JOBS_COUNT = 127;
113     private static final boolean DELETED = true;
114     private final ExecutorService executor = Executors.newFixedThreadPool(90);
115
116     private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
117
118     private final long FEW = 1000;
119
120     private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
121     private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
122     private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
123     private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
124     private JobsBrokerService broker;
125
126     @Inject
127     JobAdapter jobAdapter;
128     @Inject
129     private DataAccessService dataAccessService;
130     @Inject
131     private SessionFactory sessionFactory;
132
133     @Mock
134     private VersionService versionService;
135
136     @AfterMethod
137     public void threadsCounter() {
138         logger.info("participating threads count: " + threadsIds.size());
139         threadsIds.clear();
140     }
141
142     @BeforeMethod
143     public void initializeBroker() {
144         MockitoAnnotations.initMocks(this);
145         when(versionService.retrieveBuildNumber()).thenReturn("aBuildNumber");
146         broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0, versionService);
147         ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
148     }
149
150     /*
151     - pulling jobs is limited to inserted ones
152     - putting back allows getting the job again
153     - multi threads safety
154     - any added job should be visible with view
155
156     - edges:
157         - pulling with empty repo should return empty optional
158         - pulling more than expected should return empty optional
159         - putting one, over-pulling from a different thread
160         - take before inserting, then insert while waiting
161
162      */
163
164     private class NoJobException extends RuntimeException {
165     }
166
167     private Future<Job> newJobAsync(JobsBrokerService b) {
168         return newJobAsync(b, createMockJob("user id"));
169     }
170
171     private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
172         return newJobAsync(b, createMockJob("user id", status));
173     }
174
175     private Job createMockJob(String userId) {
176         return jobAdapter.createServiceInstantiationJob(
177                 JobType.NoOp,
178                 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
179                 UUID.randomUUID(),
180                 userId,
181                 null,
182                 "optimisticUniqueServiceInstanceName",
183                 RandomUtils.nextInt());
184     }
185
186     private Job createMockJob(String userId, Job.JobStatus jobStatus) {
187         Job job = createMockJob(userId);
188         job.setStatus(jobStatus);
189         return job;
190     }
191
192     private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
193         final Future<Job> jobFuture = executor.submit(() -> {
194             accountThreadId();
195
196             b.add(job);
197
198             return job;
199         });
200         return jobFuture;
201     }
202
203     private void pushBackJobAsync(JobsBrokerService b, Job job) {
204         executor.submit(() -> {
205             accountThreadId();
206             b.pushBack(job);
207             return job;
208         });
209     }
210
211     private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
212         final Future<Optional<Job>> job = executor.submit(() -> {
213             accountThreadId();
214             // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
215             return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
216         });
217         return job;
218     }
219
220     private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
221         try {
222             return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
223         } catch (TimeoutException | InterruptedException | ExecutionException e) {
224             throw new RuntimeException(e);
225         }
226     }
227
228     private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
229         try {
230             return retrievedJobFuture.get(FEW, MILLISECONDS);
231         } catch (TimeoutException | InterruptedException | ExecutionException e) {
232             throw new RuntimeException(e);
233         }
234     }
235
236     private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
237         final List<Job> originalJobs = putALotOfJobs(broker);
238         final List<Job> retrievedJobs = getAlotOfJobs(broker);
239
240         assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
241
242         return retrievedJobs;
243     }
244
245     private List<Job> putALotOfJobs(JobsBrokerService broker) {
246         int n = JOBS_COUNT;
247         return IntStream.range(0, n)
248                 .mapToObj(i -> newJobAsync(broker))
249                 .map(this::waitForFutureJob)
250                 .collect(toList());
251     }
252
253     private List<Job> getAlotOfJobs(JobsBrokerService broker) {
254         int n = JOBS_COUNT;
255         return IntStream.range(0, n)
256                 .mapToObj(i -> pullJobAsync(broker))
257                 .map(this::waitForFutureOptionalJob)
258                 .collect(toList());
259     }
260
261     private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
262         jobs.forEach(job -> pushBackJobAsync(broker, job));
263     }
264
265     private void accountThreadId() {
266         threadsIds.add(Thread.currentThread().getId());
267     }
268
269     @Test
270     public void givenSingleJob_getIt_verifySameJob() {
271         final Job originalJob = waitForFutureJob(newJobAsync(broker));
272
273         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
274         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
275     }
276
277     @DataProvider
278     public static Object[][] allTopics() {
279         return JobSchedulerInitializer.WORKERS_TOPICS.stream()
280                 .map(topic -> new Object[] { topic })
281                 .toArray(Object[][]::new);
282     }
283
284     @Test(dataProvider = "allTopics")
285     public void givenJobFromSameBuild_pullJobs_jobIsPulled(Job.JobStatus topic) {
286         when(versionService.retrieveBuildNumber()).thenReturn("someVersion");
287         Job mockedJob = createMockJob("user id", topic);
288         UUID uuid = broker.add(mockedJob);
289         assertEquals(uuid,  broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
290     }
291
292
293     @Test(dataProvider = "allTopics")
294     public void givenJobFromOtherBuild_pullJobs_noneIsPulled(Job.JobStatus topic) {
295         when(versionService.retrieveBuildNumber()).thenReturn("old");
296         Job mockedJob = createMockJob("user id", topic);
297         broker.add(mockedJob);
298         when(versionService.retrieveBuildNumber()).thenReturn("new");
299         assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
300     }
301
302     @Test
303     public void givenJobWithNullBuildAndJobWithRealBuild_pullJobs_jobsWithNonNullIsPulled() {
304         Job.JobStatus topic = PENDING;
305
306         //push job with null build
307         when(versionService.retrieveBuildNumber()).thenReturn(null);
308         broker.add(createMockJob("user id", topic));
309
310         //push job with "aBuild" build
311         when(versionService.retrieveBuildNumber()).thenReturn("aBuild");
312         UUID newJobId = broker.add(createMockJob("user id", topic));
313
314         //pull jobs while current build is still "aBuild". Only the non null build is pulled
315         assertEquals(newJobId,  broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
316
317         //no more jobs to pull
318         assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
319     }
320
321
322     @Test
323     public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
324         final List<Job> originalJobs = putALotOfJobs(broker);
325
326         MILLISECONDS.sleep(FEW);
327         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
328
329         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
330
331         MILLISECONDS.sleep(FEW);
332         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
333
334         pushBackJobAsync(broker, retrievedJob);
335
336         MILLISECONDS.sleep(FEW);
337         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
338     }
339
340     @Test
341     public void givenManyJobs_getThemAll_verifySameJobs() {
342         putAndGetALotOfJobs(broker);
343     }
344
345     @Test
346     public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
347         final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
348
349         pushBackJobs(retrievedJobs1, broker);
350         final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
351
352         assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
353     }
354
355     private static Date toDate(LocalDateTime localDateTime) {
356         return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
357     }
358
359     private void setModifiedDateToJob(UUID jobUuid, Date date) {
360         DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
361         job.setModified(date);
362         DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
363             session.saveOrUpdate(job);
364             return 1;
365         });
366     }
367
368
369     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
370         return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
371     }
372
373     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
374         JobDaoImpl job = new JobDaoImpl();
375         job.setUuid(UUID.randomUUID());
376         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
377         job.setIndexInBulk(indexInBulk);
378         job.setTemplateId(templateId);
379         job.setType(JobType.NoOp);
380         job.setStatus(status);
381         job.setTakenBy(takenBy);
382         job.setCreated(toDate(date));
383         job.setModified(toDate(date));
384         job.setUserId(userId);
385         if (deleted) {
386             job.setDeletedAt(new Date());
387         }
388         return job;
389     }
390
391     @DataProvider
392     public static Object[][] jobs(Method test) {
393         LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
394         UUID sameTemplate = UUID.randomUUID();
395         return new Object[][]{
396                 {ImmutableList.of(
397                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
398                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
399                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
400                         () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
401                         4,
402                         0,
403                         PENDING,
404                         "Broker should pull the first pending job by oldest date then by job index"
405                 },
406                 { ImmutableList.of(
407                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
408                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
409                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
410                         () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
411                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
412                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
413                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
414                         6,
415                         5,
416                         PENDING,
417                         "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
418                 },
419                 {ImmutableList.of(
420                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
421                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
422                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
423                         2,
424                         -1,
425                         PENDING,
426                         "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
427                 },
428                 {ImmutableList.of(
429                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
430                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
431                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
432                         2,
433                         -1,
434                         PENDING,
435                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
436                 },
437                 {ImmutableList.of(
438                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
439                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
440                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
441                         () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
442                 ),
443                         3,
444                         2,
445                         PENDING,
446                         "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
447                 },
448                 {ImmutableList.of(
449                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
450                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
451                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
452                         3,
453                         -1,
454                         PENDING,
455                         "Broker should not pull any job when there is another job from this template that was taken"
456                 },
457                 {ImmutableList.of(
458                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
459                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
460                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
461                         3,
462                         -1,
463                         PENDING,
464                         "Broker should not pull any job when there is another job from this template that in progress"
465                 },
466                 {ImmutableList.of(
467                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
468                         () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
469                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
470                         3,
471                         -1,
472                         PENDING,
473                         "Broker should not pull any job when there is another job from this template that was failed"
474                 },
475                 {ImmutableList.of(
476                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
477                         () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
478                         () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
479                         3,
480                         2,
481                         PENDING,
482                         "Broker should pull pending job when there is another job from this template that was deleted, although failed"
483                 },
484                 { ImmutableList.of(
485                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
486                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
487                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
488                         3,
489                         2,
490                         PENDING,
491                         "Broker should prioritize jobs of user that has no in-progress jobs"
492                 },
493                 {ImmutableList.of(
494                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
495                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
496                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
497                         3,
498                         2,
499                         PENDING,
500                         "Broker should prioritize jobs of user that has no taken jobs"
501                 },
502                 {ImmutableList.of(
503                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
504                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
505                         () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
506                         () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
507                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
508                         5,
509                         4,
510                         PENDING,
511                         "Broker should take oldest job when there is one in-progress job to each user"
512                 },
513                 {ImmutableList.of(
514                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
515                         () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
516                         () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
517                         2,
518                         -1,
519                         PENDING,
520                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
521                 },
522                 {ImmutableList.of(
523                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
524                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
525                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
526                         () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
527                         20,
528                         1,
529                         IN_PROGRESS,
530                         "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
531                 },
532                 {ImmutableList.of(
533                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
534                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
535                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
536                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
537                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
538                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
539                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
540                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
541                         () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
542                 ),
543                         20,
544                         6,
545                         IN_PROGRESS,
546                         "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
547                 },
548                 {ImmutableList.of(
549                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
550                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
551                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
552                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
553                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
554                         () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
555                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
556                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
557                         () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
558                 ),
559                         20,
560                         6,
561                         RESOURCE_IN_PROGRESS,
562                         "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
563                 },
564                 {ImmutableList.of(
565                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
566                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
567                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
568                         20,
569                         -1,
570                         IN_PROGRESS,
571                         "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
572                 },
573                 {ImmutableList.of(
574                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
575                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
576                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
577                         20,
578                         -1,
579                         RESOURCE_IN_PROGRESS,
580                         "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
581                 },
582                 {ImmutableList.of(
583                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
584                         () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
585                         () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
586                         1,
587                         2,
588                         CREATING,
589                         "Broker with creating topic should pull oldest creating job and ignore mso limit"
590                 },
591                 {ImmutableList.of(
592                         (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
593                         1,
594                         0,
595                         CREATING,
596                         "Broker with CREATING topic should pull CREATING job that was just modified"
597                 }
598
599         };
600     }
601
602     public interface Jobber {
603         // Will defer LocalDateTime.now() to test's "real-time"
604         JobDaoImpl toJob();
605     }
606
607     @Test(dataProvider = "jobs")
608     public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
609         JobsBrokerServiceInDatabaseImpl aBroker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20, versionService);
610         final List<JobDaoImpl> jobs = addJobsWithModifiedDate(jobbers, aBroker);
611         Optional<Job> nextJob = aBroker.pull(topic, UUID.randomUUID().toString());
612         boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
613         String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
614         Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
615         if (shouldAnyBeSelected) {
616             Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
617         }
618     }
619
620     @NotNull
621     protected List<JobDaoImpl> addJobsWithModifiedDate(List<Jobber> jobbers, JobsBrokerService broker) {
622         final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
623         return addJobsWithModifiedDateByJobDao(jobs, broker);
624     }
625
626     @NotNull
627     private List<JobDaoImpl> addJobsWithModifiedDateByJobDao(List<JobDaoImpl> jobs, JobsBrokerService broker) {
628         for (JobDaoImpl job : jobs) {
629             Date modifiedDate = job.getModified();
630             broker.add(job);
631             setModifiedDateToJob(job.getUuid(), modifiedDate);
632         }
633         return jobs;
634     }
635
636     @DataProvider
637     public static Object[][] jobsForTestingPendingResource(Method test) {
638         UUID templateId1 = UUID.fromString("311a9196-bbc5-47a1-8b11-bf0f9db1c7ca");
639         UUID templateId2 = UUID.fromString("4f1522f9-642e-49f7-af75-a2f344085bcc");
640         return new Object[][]{
641                 {ImmutableList.of( (Jobber)
642                                 () -> createNewJob(12, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
643                         () -> createNewJob(1, templateId2, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
644                         () -> createNewJob(2, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
645                         () -> createNewJob(3, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
646                         () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false)
647                 ),
648                         0,
649                         "given there is only one in the queue in PENDING_RESOURCE and no other job with same templateId, then this job is selected"
650                 },
651                 {ImmutableList.of( (Jobber)
652                                 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
653                         () -> createNewJob(3, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(2), false),
654                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
655                 ),
656                         2,
657                         "given multiple jobs with same templateId in PENDING_RESOURCE, then job with lowest indexInBulk is selected"
658                 },
659                 {ImmutableList.of( (Jobber)
660                                 () -> createNewJob(1, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
661                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
662                 ),
663                         1,
664                         "given multiple jobs with same indexInBulk, then job with lowest templateId is selected"
665                 },
666                 {ImmutableList.of( (Jobber)
667                                 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
668                         () -> createNewJob(2, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false)
669                 ),
670                         0,
671                         "given multiple jobs with different indexInBulk and different templateId, then job with lowest indexInBulk is selected"
672                 },
673                 {ImmutableList.of( (Jobber)
674                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
675                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false)
676                 ),
677                         -1,
678                         "given there is already taken job with same templateId, then no job is selected"
679                 },
680                 {ImmutableList.of( (Jobber)
681                                 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
682                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false),
683                         () -> createNewJob(9, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
684                         () -> createNewJob(8, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
685                 ),
686                         3,
687                         "given 4 jobs, 2 jobs templateId1 but one of them is taken, and 2 jobs with templateId2, then select job with templateId2"
688                 },
689                 {ImmutableList.of( (Jobber)
690                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
691                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), true)
692                 ),
693                         0,
694                         "given 2 jobs with same templateId, one of them is taken but deleted, then the other job is selected"
695                 },
696                 {ImmutableList.of( (Jobber)
697                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
698                         () -> createNewJob(1, templateId1, "userId", IN_PROGRESS, null, LocalDateTime.now(), false)
699                 ),
700                         -1,
701                         "given 2 jobs with same templateId, one of them is IN_PROGRESS, then no job is selected"
702                 },
703                 {ImmutableList.of( (Jobber)
704                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
705                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false)
706                 ),
707                         -1,
708                         "given 2 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS, then no job is selected"
709                 },
710                 {ImmutableList.of( (Jobber)
711                                 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
712                         () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
713                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), true)
714                 ),
715                         1,
716                         "given 3 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS but deleted, then other job with lowest indexInBulk is selected"
717                 },
718                 {ImmutableList.of( (Jobber)
719                                 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
720                         () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
721                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false),
722                         () -> createNewJob(12, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
723                         () -> createNewJob(11, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
724                 ),
725                         4,
726                         "given 5 jobs, 3 with templateId1 that one of them is RESOURCE_IN_PROGRESS,"+
727                                 "2 with templateId2 both in PENDING_RESOURCE, then job with lowest indexInBulk from templateId2 is selected"
728
729                 },
730                 {ImmutableList.of( (Jobber)
731                         () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), true)
732                 ),
733                         -1,
734                         "given 1 job in PENDING_RESOURCE but it's deleted, then no job is selected"
735                 },
736                 {ImmutableList.of( (Jobber)
737                                 () -> createNewJob(20, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
738                         () -> createNewJob(1, templateId1, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
739                         () -> createNewJob(2, templateId1, "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
740                         () -> createNewJob(3, templateId1, "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
741                         () -> createNewJob(4, templateId1, "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
742                         () -> createNewJob(5, templateId1, "userId", STOPPED, null, LocalDateTime.now().minusSeconds(1), false),
743                         () -> createNewJob(6, templateId1, "userId", PAUSE, null, LocalDateTime.now().minusSeconds(1), false)
744                 ),
745                         0,
746                         "given multiple jobs with same templateId, 1 in PENDING_RESOURCE, and other are not in progress, "+
747                                 "then the job in PENDING_RESOURCE is selected"
748                 },
749                 {ImmutableList.of( (Jobber)
750                                 () -> createNewJob(1, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
751                         () -> createNewJob(2, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
752                         () -> createNewJob(3, UUID.randomUUID(), "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
753                         () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
754                         () -> createNewJob(5, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
755                         () -> createNewJob(6, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false)
756                 ),
757                         -1,
758                         "given there is no job in PENDING_RESOURCE state, then no job is selected"
759                 },
760                 {ImmutableList.of( (Jobber)
761                         () -> createNewJob(6, null, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false)
762                 ),
763                         -1,
764                         "given there is 1 job in PENDING_RESOURCE but without templateId, then no job is selected"
765                 },
766         };
767     }
768
769     @Test(dataProvider = "jobsForTestingPendingResource")
770     public void givenSomeJobs_pullPendingResource_returnNextOrNothingAsExpected(List<Jobber> jobbers, int expectedIndexSelected, String assertionReason) {
771         givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(jobbers, 1, expectedIndexSelected, PENDING_RESOURCE, assertionReason);
772     }
773
774     public static JobDaoImpl createNewJob(Job.JobStatus status, String takenBy, long secondsOffset, boolean deleted) {
775         return createNewJob(1, UUID.randomUUID(), "af456", status, takenBy, LocalDateTime.now().minusSeconds(secondsOffset), deleted);
776     }
777
778     @Test
779     public void givenSomeJobs_deleteOldFinalJobs_onlyExpectedJobsAreDeleted() {
780         long seconds = 999;
781         final List<Pair<JobDaoImpl,Boolean>> jobs = ImmutableList.of(
782                 //not final
783                 Pair.of(createNewJob(IN_PROGRESS, null, seconds+1, false), true),
784                 Pair.of(createNewJob(RESOURCE_IN_PROGRESS, null, seconds+1, false), true),
785                 Pair.of(createNewJob(PENDING, null, seconds+1, false), true),
786                 Pair.of(createNewJob(CREATING, null, seconds+1, false), true),
787                 Pair.of(createNewJob(PENDING_RESOURCE, null, seconds+1, false), true),
788                 Pair.of(createNewJob(PAUSE, null, seconds+1, false), true),
789
790                 //final
791                 Pair.of(createNewJob(COMPLETED, null, seconds+1, false), false),
792                 Pair.of(createNewJob(FAILED, null, seconds+1, false), false),
793                 Pair.of(createNewJob(STOPPED, null, seconds+1, false), false),
794                 Pair.of(createNewJob(COMPLETED_WITH_ERRORS, null, seconds+1, true), false),
795                 Pair.of(createNewJob(COMPLETED_WITH_NO_ACTION, generateRandomAlphaNumeric(5), seconds+1, true), false),
796
797                 //final but not old
798                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-2, false), true),
799                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-400, false), true),
800                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), 0, false), true)
801         );
802         addJobsWithModifiedDateByJobDao(jobs.stream().map(Pair::getLeft).collect(Collectors.toList()), broker);
803         assertEquals(jobs.size(), broker.peek().size());
804
805         broker.deleteOldFinalJobs(seconds);
806         Stream<Pair<UUID, Job.JobStatus>> expectedJobs = jobs.stream()
807                 .filter(Pair::getRight)
808                 .map(x -> Pair.of(
809                         x.getLeft().getUuid(),
810                         x.getLeft().getStatus()
811                 ));
812         assertThat(broker.peek().stream().map(x->Pair.of(x.getUuid(), x.getStatus())).collect(Collectors.toList()),
813                 containsInAnyOrder(expectedJobs.toArray()));
814     }
815
816     @DataProvider
817     public Object[][] topics() {
818         return Arrays.stream(Job.JobStatus.values())
819                 .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS, PENDING_RESOURCE).contains(t)))
820                 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
821     }
822
823     @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
824     public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
825         broker.pull(topic, UUID.randomUUID().toString());
826     }
827
828     @Test(expectedExceptions = NoJobException.class)
829     public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
830         Stream.of(Job.JobStatus.values())
831                 .filter(not(s -> s.equals(PENDING)))
832                 .map(s -> createMockJob("some user id", s))
833                 .map(job -> newJobAsync(broker, job))
834                 .map(this::waitForFutureJob)
835                 .collect(toList());
836
837         waitForFutureOptionalJob(pullJobAsync(broker));
838     }
839
840     @Test
841     public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
842         newJobAsync(broker); // this negated the expected result of the call below
843         givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
844     }
845
846     @Test(expectedExceptions = NoJobException.class)
847     public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
848         putAndGetALotOfJobs(broker);
849         waitForFutureOptionalJob(pullJobAsync(broker));
850     }
851
852     @Test(expectedExceptions = NoJobException.class)
853     public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
854         final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
855         assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
856         waitForFutureOptionalJob(futureOptionalJob);
857     }
858
859     @Test(expectedExceptions = IllegalStateException.class)
860     public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
861         waitForFutureJob(newJobAsync(broker));
862         waitForFutureJob(newJobAsync(broker));
863         waitForFutureOptionalJob(pullJobAsync(broker));
864
865         Job myJob = createMockJob("user id");
866         myJob.setUuid(UUID.randomUUID());
867
868         broker.pushBack(myJob); //Should fail
869     }
870
871     @Test
872     public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
873         final ImmutableMap<String, Object> randomDataForMostRecentJobType =
874                 ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
875
876         waitForFutureJob(newJobAsync(broker));
877         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
878
879         job.setStatus(Job.JobStatus.PENDING);
880         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
881         job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
882         job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
883
884         broker.pushBack(job);
885         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
886
887         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
888         assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
889         assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
890     }
891
892     private static String jobDataReflected(Job job) {
893         return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
894                 .setExcludeFieldNames("created", "modified", "takenBy")
895                 .toString();
896     }
897
898     @Test(expectedExceptions = IllegalStateException.class)
899     public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
900         waitForFutureJob(newJobAsync(broker));
901         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
902
903         broker.pushBack(job);
904         broker.pushBack(job); //Should fail
905     }
906
907     @Test
908     public void addJob_PeekItById_verifySameJobWasPeeked() {
909         String userId = UUID.randomUUID().toString();
910         Job myJob = createMockJob(userId);
911         UUID uuid = broker.add(myJob);
912         Job peekedJob = broker.peek(uuid);
913         assertEquals("added testId is not the same as peeked TestsId",
914                 userId,
915                 peekedJob.getSharedData().getUserId());
916     }
917
918     @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
919     public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
920         final Job job = waitForFutureJob(newJobAsync(broker, status));
921         broker.delete(job.getUuid());
922         assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
923         waitForFutureOptionalJob(pullJobAsync(broker));
924     }
925
926     @DataProvider
927     public static Object[][] jobStatusesForSuccessDelete() {
928         return new Object[][]{
929                 {PENDING},
930                 {STOPPED}
931         };
932     }
933
934     @Test(
935             dataProvider = "jobStatusesForFailedDelete",
936             expectedExceptions = OperationNotAllowedException.class,
937             expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
938     )
939     public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
940         final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
941
942         if (taken) {
943             waitForFutureOptionalJob(pullJobAsync(broker));
944         }
945
946
947         broker.delete(job.getUuid());
948     }
949
950     @DataProvider
951     public static Object[][] jobStatusesForFailedDelete() {
952         return new Object[][]{
953                 {PENDING, true},
954                 {IN_PROGRESS, false},
955                 {COMPLETED, false},
956                 {PAUSE, false},
957                 {FAILED, false},
958         };
959     }
960
961     @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
962     public void deleteJob_notExist_exceptionIsThrown() {
963         waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
964         broker.delete(new UUID(111, 111));
965     }
966
967     public static class MockAsyncRequest implements AsyncJobRequest {
968         public String value;
969
970         public MockAsyncRequest() {}
971
972         public MockAsyncRequest(String value) {
973             this.value = value;
974         }
975
976         public String getValue() {
977             return value;
978         }
979     }
980
981     @Test
982     public void twoJobsWithSamePosition_bothJobsArePulled(){
983         UUID uuid = UUID.randomUUID();
984         int positionInBulk = RandomUtils.nextInt();
985         String userId = "userId";
986
987         Optional<Job> firstPulledJob = createAddAndPullJob(uuid, positionInBulk, userId, "first value");
988         Optional<Job> secondPulledJob = createAddAndPullJob(uuid, positionInBulk, userId, "second value");
989
990         MockAsyncRequest firstValue = (MockAsyncRequest) firstPulledJob.get().getSharedData().getRequest();
991         MockAsyncRequest secondValue = (MockAsyncRequest) secondPulledJob.get().getSharedData().getRequest();
992         assertThat(ImmutableList.of(firstValue.value, secondValue.value),
993             containsInAnyOrder("first value", "second value"));
994     }
995
996     private Optional<Job> createAddAndPullJob(UUID uuid, int positionInBulk, String userId, String s) {
997         JobDaoImpl job1 = createNewJob(positionInBulk, uuid, userId, CREATING, null,
998             LocalDateTime.now().minusSeconds(1), false);
999         job1.setSharedData(new JobSharedData(null, userId, new MockAsyncRequest(s), "testApi"));
1000         broker.add(job1);
1001         return broker.pull(CREATING, userId);
1002     }
1003 }