Pause Upon VF Module Failure
[vid.git] / vid-app-common / src / test / java / org / onap / vid / services / JobsBrokerServiceTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.services;
22
23
24 import static java.util.concurrent.TimeUnit.MILLISECONDS;
25 import static java.util.stream.Collectors.toList;
26 import static org.hamcrest.CoreMatchers.equalTo;
27 import static org.hamcrest.CoreMatchers.is;
28 import static org.hamcrest.MatcherAssert.assertThat;
29 import static org.hamcrest.Matchers.both;
30 import static org.hamcrest.Matchers.containsInAnyOrder;
31 import static org.mockito.Mockito.when;
32 import static org.onap.vid.job.Job.JobStatus.COMPLETED;
33 import static org.onap.vid.job.Job.JobStatus.COMPLETED_WITH_ERRORS;
34 import static org.onap.vid.job.Job.JobStatus.COMPLETED_WITH_NO_ACTION;
35 import static org.onap.vid.job.Job.JobStatus.CREATING;
36 import static org.onap.vid.job.Job.JobStatus.FAILED;
37 import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
38 import static org.onap.vid.job.Job.JobStatus.PAUSE;
39 import static org.onap.vid.job.Job.JobStatus.PENDING;
40 import static org.onap.vid.job.Job.JobStatus.PENDING_RESOURCE;
41 import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
42 import static org.onap.vid.job.Job.JobStatus.STOPPED;
43 import static org.onap.vid.testUtils.TestUtils.generateRandomAlphaNumeric;
44 import static org.onap.vid.utils.Streams.not;
45 import static org.testng.Assert.assertNotNull;
46 import static org.testng.AssertJUnit.assertEquals;
47 import static org.testng.AssertJUnit.assertFalse;
48
49 import com.google.common.collect.ImmutableList;
50 import com.google.common.collect.ImmutableMap;
51 import java.lang.reflect.Method;
52 import java.time.LocalDateTime;
53 import java.time.ZoneId;
54 import java.util.Arrays;
55 import java.util.Date;
56 import java.util.List;
57 import java.util.Optional;
58 import java.util.Set;
59 import java.util.UUID;
60 import java.util.concurrent.ConcurrentSkipListSet;
61 import java.util.concurrent.ExecutionException;
62 import java.util.concurrent.ExecutorService;
63 import java.util.concurrent.Executors;
64 import java.util.concurrent.Future;
65 import java.util.concurrent.TimeoutException;
66 import java.util.stream.Collectors;
67 import java.util.stream.IntStream;
68 import java.util.stream.Stream;
69 import javax.inject.Inject;
70 import org.apache.commons.lang.RandomStringUtils;
71 import org.apache.commons.lang3.RandomUtils;
72 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
73 import org.apache.commons.lang3.builder.ToStringStyle;
74 import org.apache.commons.lang3.tuple.Pair;
75 import org.apache.log4j.LogManager;
76 import org.apache.log4j.Logger;
77 import org.hibernate.SessionFactory;
78 import org.jetbrains.annotations.NotNull;
79 import org.mockito.Mock;
80 import org.mockito.MockitoAnnotations;
81 import org.onap.portalsdk.core.domain.support.DomainVo;
82 import org.onap.portalsdk.core.service.DataAccessService;
83 import org.onap.portalsdk.core.util.SystemProperties;
84 import org.onap.vid.config.DataSourceConfig;
85 import org.onap.vid.config.JobAdapterConfig;
86 import org.onap.vid.exceptions.GenericUncheckedException;
87 import org.onap.vid.exceptions.OperationNotAllowedException;
88 import org.onap.vid.job.Job;
89 import org.onap.vid.job.JobAdapter;
90 import org.onap.vid.job.JobAdapter.AsyncJobRequest;
91 import org.onap.vid.job.JobType;
92 import org.onap.vid.job.JobsBrokerService;
93 import org.onap.vid.job.command.JobCommandFactoryTest;
94 import org.onap.vid.job.impl.JobDaoImpl;
95 import org.onap.vid.job.impl.JobSchedulerInitializer;
96 import org.onap.vid.job.impl.JobSharedData;
97 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
98 import org.onap.vid.utils.DaoUtils;
99 import org.springframework.test.context.ContextConfiguration;
100 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
101 import org.testng.Assert;
102 import org.testng.annotations.AfterMethod;
103 import org.testng.annotations.BeforeMethod;
104 import org.testng.annotations.DataProvider;
105 import org.testng.annotations.Test;
106 import org.togglz.core.manager.FeatureManager;
107
108 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
109 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
110
111     private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
112
113     private static final int JOBS_COUNT = 127;
114     private static final boolean DELETED = true;
115     private final ExecutorService executor = Executors.newFixedThreadPool(90);
116
117     private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
118
119     private final long FEW = 1000;
120     private final long SOME = 2000;
121
122     private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
123     private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
124     private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
125     private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
126     private JobsBrokerService broker;
127
128     @Mock
129     private FeatureManager featureManager;
130
131     @Inject
132     JobAdapter jobAdapter;
133     @Inject
134     private DataAccessService dataAccessService;
135     @Inject
136     private SessionFactory sessionFactory;
137
138     @Mock
139     private VersionService versionService;
140
141     @AfterMethod
142     public void threadsCounter() {
143         logger.info("participating threads count: " + threadsIds.size());
144         threadsIds.clear();
145     }
146
147     @BeforeMethod
148     public void initializeBroker() {
149         MockitoAnnotations.initMocks(this);
150         when(versionService.retrieveBuildNumber()).thenReturn("aBuildNumber");
151         broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0, versionService, featureManager);
152         ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
153     }
154
155     /*
156     - pulling jobs is limited to inserted ones
157     - putting back allows getting the job again
158     - multi threads safety
159     - any added job should be visible with view
160
161     - edges:
162         - pulling with empty repo should return empty optional
163         - pulling more than expected should return empty optional
164         - putting one, over-pulling from a different thread
165         - take before inserting, then insert while waiting
166
167      */
168
169     private class NoJobException extends RuntimeException {
170     }
171
172     private Future<Job> newJobAsync(JobsBrokerService b) {
173         return newJobAsync(b, createMockJob("user id"));
174     }
175
176     private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
177         return newJobAsync(b, createMockJob("user id", status));
178     }
179
180     private Job createMockJob(String userId) {
181         return jobAdapter.createServiceInstantiationJob(
182                 JobType.NoOp,
183                 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
184                 UUID.randomUUID(),
185                 userId,
186                 null,
187                 "optimisticUniqueServiceInstanceName",
188                 RandomUtils.nextInt());
189     }
190
191     private Job createMockJob(String userId, Job.JobStatus jobStatus) {
192         Job job = createMockJob(userId);
193         job.setStatus(jobStatus);
194         return job;
195     }
196
197     private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
198         final Future<Job> jobFuture = executor.submit(() -> {
199             accountThreadId();
200
201             b.add(job);
202
203             return job;
204         });
205         return jobFuture;
206     }
207
208     private void pushBackJobAsync(JobsBrokerService b, Job job) {
209         executor.submit(() -> {
210             accountThreadId();
211             b.pushBack(job);
212             return job;
213         });
214     }
215
216     private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
217         final Future<Optional<Job>> job = executor.submit(() -> {
218             accountThreadId();
219             // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
220             return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
221         });
222         return job;
223     }
224
225     private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
226         try {
227             return retrievedOptionalJobFuture.get(SOME, MILLISECONDS).orElseThrow(NoJobException::new);
228         } catch (TimeoutException | InterruptedException | ExecutionException e) {
229             throw new RuntimeException(e);
230         }
231     }
232
233     private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
234         try {
235             return retrievedJobFuture.get(SOME, MILLISECONDS);
236         } catch (TimeoutException | InterruptedException | ExecutionException e) {
237             throw new RuntimeException(e);
238         }
239     }
240
241     private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
242         final List<Job> originalJobs = putALotOfJobs(broker);
243         final List<Job> retrievedJobs = getAlotOfJobs(broker);
244
245         assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
246
247         return retrievedJobs;
248     }
249
250     private List<Job> putALotOfJobs(JobsBrokerService broker) {
251         int n = JOBS_COUNT;
252         return IntStream.range(0, n)
253                 .mapToObj(i -> newJobAsync(broker))
254                 .map(this::waitForFutureJob)
255                 .collect(toList());
256     }
257
258     private List<Job> getAlotOfJobs(JobsBrokerService broker) {
259         int n = JOBS_COUNT;
260         return IntStream.range(0, n)
261                 .mapToObj(i -> pullJobAsync(broker))
262                 .map(this::waitForFutureOptionalJob)
263                 .collect(toList());
264     }
265
266     private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
267         jobs.forEach(job -> pushBackJobAsync(broker, job));
268     }
269
270     private void accountThreadId() {
271         threadsIds.add(Thread.currentThread().getId());
272     }
273
274     @Test
275     public void givenSingleJob_getIt_verifySameJob() {
276         final Job originalJob = waitForFutureJob(newJobAsync(broker));
277
278         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
279         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
280     }
281
282     @DataProvider
283     public static Object[][] allTopics() {
284         return JobSchedulerInitializer.WORKERS_TOPICS.stream()
285                 .map(topic -> new Object[] { topic })
286                 .toArray(Object[][]::new);
287     }
288
289     @Test(dataProvider = "allTopics")
290     public void givenJobFromSameBuild_pullJobs_jobIsPulled(Job.JobStatus topic) {
291         when(versionService.retrieveBuildNumber()).thenReturn("someVersion");
292         Job mockedJob = createMockJob("user id", topic);
293         UUID uuid = broker.add(mockedJob);
294         assertEquals(uuid,  broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
295     }
296
297
298     @Test(dataProvider = "allTopics")
299     public void givenJobFromOtherBuild_pullJobs_noneIsPulled(Job.JobStatus topic) {
300         when(versionService.retrieveBuildNumber()).thenReturn("old");
301         Job mockedJob = createMockJob("user id", topic);
302         broker.add(mockedJob);
303         when(versionService.retrieveBuildNumber()).thenReturn("new");
304         assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
305     }
306
307     @Test
308     public void givenJobWithNullBuildAndJobWithRealBuild_pullJobs_jobsWithNonNullIsPulled() {
309         Job.JobStatus topic = PENDING;
310
311         //push job with null build
312         when(versionService.retrieveBuildNumber()).thenReturn(null);
313         broker.add(createMockJob("user id", topic));
314
315         //push job with "aBuild" build
316         when(versionService.retrieveBuildNumber()).thenReturn("aBuild");
317         UUID newJobId = broker.add(createMockJob("user id", topic));
318
319         //pull jobs while current build is still "aBuild". Only the non null build is pulled
320         assertEquals(newJobId,  broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
321
322         //no more jobs to pull
323         assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
324     }
325
326
327     @Test
328     public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
329         final List<Job> originalJobs = putALotOfJobs(broker);
330
331         MILLISECONDS.sleep(FEW);
332         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
333
334         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
335
336         MILLISECONDS.sleep(FEW);
337         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
338
339         pushBackJobAsync(broker, retrievedJob);
340
341         MILLISECONDS.sleep(FEW);
342         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
343     }
344
345     @Test
346     public void givenManyJobs_getThemAll_verifySameJobs() {
347         putAndGetALotOfJobs(broker);
348     }
349
350     @Test
351     public void givenManyJobs_getThemAllThenPushBackAndGet_verifySameJobs() {
352         final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
353
354         pushBackJobs(retrievedJobs1, broker);
355         final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
356
357         assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
358     }
359
360     private static Date toDate(LocalDateTime localDateTime) {
361         return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
362     }
363
364     private void setModifiedDateToJob(UUID jobUuid, Date date) {
365         DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
366         job.setModified(date);
367         DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
368             session.saveOrUpdate(job);
369             return 1;
370         });
371     }
372
373
374     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
375         return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
376     }
377
378     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
379         JobDaoImpl job = new JobDaoImpl();
380         job.setUuid(UUID.randomUUID());
381         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
382         job.setIndexInBulk(indexInBulk);
383         job.setTemplateId(templateId);
384         job.setType(JobType.NoOp);
385         job.setStatus(status);
386         job.setTakenBy(takenBy);
387         job.setCreated(toDate(date));
388         job.setModified(toDate(date));
389         job.setUserId(userId);
390         if (deleted) {
391             job.setDeletedAt(new Date());
392         }
393         return job;
394     }
395
396     @DataProvider
397     public static Object[][] jobs(Method test) {
398         LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
399         UUID sameTemplate = UUID.randomUUID();
400         return new Object[][]{
401                 {ImmutableList.of(
402                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
403                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
404                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
405                         () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
406                         4,
407                         0,
408                         PENDING,
409                         "Broker should pull the first pending job by oldest date then by job index"
410                 },
411                 { ImmutableList.of(
412                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
413                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
414                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
415                         () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
416                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
417                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
418                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
419                         6,
420                         5,
421                         PENDING,
422                         "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
423                 },
424                 {ImmutableList.of(
425                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
426                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
427                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
428                         2,
429                         -1,
430                         PENDING,
431                         "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
432                 },
433                 {ImmutableList.of(
434                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
435                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
436                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
437                         2,
438                         -1,
439                         PENDING,
440                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
441                 },
442                 {ImmutableList.of(
443                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
444                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
445                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
446                         () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
447                 ),
448                         3,
449                         2,
450                         PENDING,
451                         "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
452                 },
453                 {ImmutableList.of(
454                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
455                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
456                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
457                         3,
458                         -1,
459                         PENDING,
460                         "Broker should not pull any job when there is another job from this template that was taken"
461                 },
462                 {ImmutableList.of(
463                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
464                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
465                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
466                         3,
467                         -1,
468                         PENDING,
469                         "Broker should not pull any job when there is another job from this template that in progress"
470                 },
471                 {ImmutableList.of(
472                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
473                         () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
474                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
475                         3,
476                         -1,
477                         PENDING,
478                         "Broker should not pull any job when there is another job from this template that was failed"
479                 },
480                 {ImmutableList.of(
481                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
482                         () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
483                         () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
484                         3,
485                         2,
486                         PENDING,
487                         "Broker should pull pending job when there is another job from this template that was deleted, although failed"
488                 },
489                 { ImmutableList.of(
490                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
491                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
492                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
493                         3,
494                         2,
495                         PENDING,
496                         "Broker should prioritize jobs of user that has no in-progress jobs"
497                 },
498                 {ImmutableList.of(
499                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
500                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
501                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
502                         3,
503                         2,
504                         PENDING,
505                         "Broker should prioritize jobs of user that has no taken jobs"
506                 },
507                 {ImmutableList.of(
508                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
509                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
510                         () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
511                         () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
512                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
513                         5,
514                         4,
515                         PENDING,
516                         "Broker should take oldest job when there is one in-progress job to each user"
517                 },
518                 {ImmutableList.of(
519                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
520                         () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
521                         () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
522                         2,
523                         -1,
524                         PENDING,
525                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
526                 },
527                 {ImmutableList.of(
528                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
529                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
530                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
531                         () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
532                         20,
533                         1,
534                         IN_PROGRESS,
535                         "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
536                 },
537                 {ImmutableList.of(
538                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
539                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
540                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
541                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
542                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
543                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
544                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
545                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
546                         () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
547                 ),
548                         20,
549                         6,
550                         IN_PROGRESS,
551                         "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
552                 },
553                 {ImmutableList.of(
554                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
555                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
556                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
557                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
558                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
559                         () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
560                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
561                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
562                         () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
563                 ),
564                         20,
565                         6,
566                         RESOURCE_IN_PROGRESS,
567                         "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
568                 },
569                 {ImmutableList.of(
570                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
571                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
572                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
573                         20,
574                         -1,
575                         IN_PROGRESS,
576                         "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
577                 },
578                 {ImmutableList.of(
579                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
580                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
581                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
582                         20,
583                         -1,
584                         RESOURCE_IN_PROGRESS,
585                         "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
586                 },
587                 {ImmutableList.of(
588                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
589                         () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
590                         () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
591                         1,
592                         2,
593                         CREATING,
594                         "Broker with creating topic should pull oldest creating job and ignore mso limit"
595                 },
596                 {ImmutableList.of(
597                         (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
598                         1,
599                         0,
600                         CREATING,
601                         "Broker with CREATING topic should pull CREATING job that was just modified"
602                 }
603
604         };
605     }
606
607     public interface Jobber {
608         // Will defer LocalDateTime.now() to test's "real-time"
609         JobDaoImpl toJob();
610     }
611
612     @Test(dataProvider = "jobs")
613     public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
614         JobsBrokerServiceInDatabaseImpl aBroker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20, versionService, featureManager);
615         final List<JobDaoImpl> jobs = addJobsWithModifiedDate(jobbers, aBroker);
616         Optional<Job> nextJob = aBroker.pull(topic, UUID.randomUUID().toString());
617         boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
618         String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
619         Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
620         if (shouldAnyBeSelected) {
621             Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
622         }
623     }
624
625     @NotNull
626     protected List<JobDaoImpl> addJobsWithModifiedDate(List<Jobber> jobbers, JobsBrokerService broker) {
627         final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
628         return addJobsWithModifiedDateByJobDao(jobs, broker);
629     }
630
631     @NotNull
632     private List<JobDaoImpl> addJobsWithModifiedDateByJobDao(List<JobDaoImpl> jobs, JobsBrokerService broker) {
633         for (JobDaoImpl job : jobs) {
634             Date modifiedDate = job.getModified();
635             broker.add(job);
636             setModifiedDateToJob(job.getUuid(), modifiedDate);
637         }
638         return jobs;
639     }
640
641     @DataProvider
642     public static Object[][] jobsForTestingPendingResource(Method test) {
643         UUID templateId1 = UUID.fromString("311a9196-bbc5-47a1-8b11-bf0f9db1c7ca");
644         UUID templateId2 = UUID.fromString("4f1522f9-642e-49f7-af75-a2f344085bcc");
645         return new Object[][]{
646                 {ImmutableList.of( (Jobber)
647                                 () -> createNewJob(12, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
648                         () -> createNewJob(1, templateId2, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
649                         () -> createNewJob(2, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
650                         () -> createNewJob(3, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
651                         () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false)
652                 ),
653                         0,
654                         "given there is only one in the queue in PENDING_RESOURCE and no other job with same templateId, then this job is selected"
655                 },
656                 {ImmutableList.of( (Jobber)
657                                 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
658                         () -> createNewJob(3, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(2), false),
659                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
660                 ),
661                         2,
662                         "given multiple jobs with same templateId in PENDING_RESOURCE, then job with lowest indexInBulk is selected"
663                 },
664                 {ImmutableList.of( (Jobber)
665                                 () -> createNewJob(1, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
666                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
667                 ),
668                         1,
669                         "given multiple jobs with same indexInBulk, then job with lowest templateId is selected"
670                 },
671                 {ImmutableList.of( (Jobber)
672                                 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
673                         () -> createNewJob(2, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false)
674                 ),
675                         0,
676                         "given multiple jobs with different indexInBulk and different templateId, then job with lowest indexInBulk is selected"
677                 },
678                 {ImmutableList.of( (Jobber)
679                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
680                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false)
681                 ),
682                         -1,
683                         "given there is already taken job with same templateId, then no job is selected"
684                 },
685                 {ImmutableList.of( (Jobber)
686                                 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
687                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false),
688                         () -> createNewJob(9, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
689                         () -> createNewJob(8, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
690                 ),
691                         3,
692                         "given 4 jobs, 2 jobs templateId1 but one of them is taken, and 2 jobs with templateId2, then select job with templateId2"
693                 },
694                 {ImmutableList.of( (Jobber)
695                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
696                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), true)
697                 ),
698                         0,
699                         "given 2 jobs with same templateId, one of them is taken but deleted, then the other job is selected"
700                 },
701                 {ImmutableList.of( (Jobber)
702                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
703                         () -> createNewJob(1, templateId1, "userId", IN_PROGRESS, null, LocalDateTime.now(), false)
704                 ),
705                         -1,
706                         "given 2 jobs with same templateId, one of them is IN_PROGRESS, then no job is selected"
707                 },
708                 {ImmutableList.of( (Jobber)
709                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
710                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false)
711                 ),
712                         -1,
713                         "given 2 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS, then no job is selected"
714                 },
715                 {ImmutableList.of( (Jobber)
716                                 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
717                         () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
718                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), true)
719                 ),
720                         1,
721                         "given 3 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS but deleted, then other job with lowest indexInBulk is selected"
722                 },
723                 {ImmutableList.of( (Jobber)
724                                 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
725                         () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
726                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false),
727                         () -> createNewJob(12, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
728                         () -> createNewJob(11, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
729                 ),
730                         4,
731                         "given 5 jobs, 3 with templateId1 that one of them is RESOURCE_IN_PROGRESS,"+
732                                 "2 with templateId2 both in PENDING_RESOURCE, then job with lowest indexInBulk from templateId2 is selected"
733
734                 },
735                 {ImmutableList.of( (Jobber)
736                         () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), true)
737                 ),
738                         -1,
739                         "given 1 job in PENDING_RESOURCE but it's deleted, then no job is selected"
740                 },
741                 {ImmutableList.of( (Jobber)
742                                 () -> createNewJob(20, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
743                         () -> createNewJob(1, templateId1, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
744                         () -> createNewJob(2, templateId1, "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
745                         () -> createNewJob(3, templateId1, "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
746                         () -> createNewJob(4, templateId1, "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
747                         () -> createNewJob(5, templateId1, "userId", STOPPED, null, LocalDateTime.now().minusSeconds(1), false),
748                         () -> createNewJob(6, templateId1, "userId", PAUSE, null, LocalDateTime.now().minusSeconds(1), false)
749                 ),
750                         0,
751                         "given multiple jobs with same templateId, 1 in PENDING_RESOURCE, and other are not in progress, "+
752                                 "then the job in PENDING_RESOURCE is selected"
753                 },
754                 {ImmutableList.of( (Jobber)
755                                 () -> createNewJob(1, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
756                         () -> createNewJob(2, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
757                         () -> createNewJob(3, UUID.randomUUID(), "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
758                         () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
759                         () -> createNewJob(5, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
760                         () -> createNewJob(6, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false)
761                 ),
762                         -1,
763                         "given there is no job in PENDING_RESOURCE state, then no job is selected"
764                 },
765                 {ImmutableList.of( (Jobber)
766                         () -> createNewJob(6, null, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false)
767                 ),
768                         -1,
769                         "given there is 1 job in PENDING_RESOURCE but without templateId, then no job is selected"
770                 },
771         };
772     }
773
774     @Test(dataProvider = "jobsForTestingPendingResource")
775     public void givenSomeJobs_pullPendingResource_returnNextOrNothingAsExpected(List<Jobber> jobbers, int expectedIndexSelected, String assertionReason) {
776         givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(jobbers, 1, expectedIndexSelected, PENDING_RESOURCE, assertionReason);
777     }
778
779     public static JobDaoImpl createNewJob(Job.JobStatus status, String takenBy, long secondsOffset, boolean deleted) {
780         return createNewJob(1, UUID.randomUUID(), "af456", status, takenBy, LocalDateTime.now().minusSeconds(secondsOffset), deleted);
781     }
782
783     @Test
784     public void givenSomeJobs_deleteOldFinalJobs_onlyExpectedJobsAreDeleted() {
785         long seconds = 999;
786         final List<Pair<JobDaoImpl,Boolean>> jobs = ImmutableList.of(
787                 //not final
788                 Pair.of(createNewJob(IN_PROGRESS, null, seconds+1, false), true),
789                 Pair.of(createNewJob(RESOURCE_IN_PROGRESS, null, seconds+1, false), true),
790                 Pair.of(createNewJob(PENDING, null, seconds+1, false), true),
791                 Pair.of(createNewJob(CREATING, null, seconds+1, false), true),
792                 Pair.of(createNewJob(PENDING_RESOURCE, null, seconds+1, false), true),
793                 Pair.of(createNewJob(PAUSE, null, seconds+1, false), true),
794
795                 //final
796                 Pair.of(createNewJob(COMPLETED, null, seconds+1, false), false),
797                 Pair.of(createNewJob(FAILED, null, seconds+1, false), false),
798                 Pair.of(createNewJob(STOPPED, null, seconds+1, false), false),
799                 Pair.of(createNewJob(COMPLETED_WITH_ERRORS, null, seconds+1, true), false),
800                 Pair.of(createNewJob(COMPLETED_WITH_NO_ACTION, generateRandomAlphaNumeric(5), seconds+1, true), false),
801
802                 //final but not old
803                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-2, false), true),
804                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-400, false), true),
805                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), 0, false), true)
806         );
807         addJobsWithModifiedDateByJobDao(jobs.stream().map(Pair::getLeft).collect(Collectors.toList()), broker);
808         assertEquals(jobs.size(), broker.peek().size());
809
810         broker.deleteOldFinalJobs(seconds);
811         Stream<Pair<UUID, Job.JobStatus>> expectedJobs = jobs.stream()
812                 .filter(Pair::getRight)
813                 .map(x -> Pair.of(
814                         x.getLeft().getUuid(),
815                         x.getLeft().getStatus()
816                 ));
817         assertThat(broker.peek().stream().map(x->Pair.of(x.getUuid(), x.getStatus())).collect(Collectors.toList()),
818                 containsInAnyOrder(expectedJobs.toArray()));
819     }
820
821     @DataProvider
822     public Object[][] topics() {
823         return Arrays.stream(Job.JobStatus.values())
824                 .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS, PENDING_RESOURCE).contains(t)))
825                 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
826     }
827
828     @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
829     public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
830         broker.pull(topic, UUID.randomUUID().toString());
831     }
832
833     @Test(expectedExceptions = NoJobException.class)
834     public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
835         Stream.of(Job.JobStatus.values())
836                 .filter(not(s -> s.equals(PENDING)))
837                 .map(s -> createMockJob("some user id", s))
838                 .map(job -> newJobAsync(broker, job))
839                 .map(this::waitForFutureJob)
840                 .collect(toList());
841
842         waitForFutureOptionalJob(pullJobAsync(broker));
843     }
844
845     @Test
846     public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
847         newJobAsync(broker); // this negated the expected result of the call below
848         givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
849     }
850
851     @Test(expectedExceptions = NoJobException.class)
852     public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
853         putAndGetALotOfJobs(broker);
854         waitForFutureOptionalJob(pullJobAsync(broker));
855     }
856
857     @Test(expectedExceptions = NoJobException.class)
858     public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
859         final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
860         assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
861         waitForFutureOptionalJob(futureOptionalJob);
862     }
863
864     @Test(expectedExceptions = IllegalStateException.class)
865     public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
866         waitForFutureJob(newJobAsync(broker));
867         waitForFutureJob(newJobAsync(broker));
868         waitForFutureOptionalJob(pullJobAsync(broker));
869
870         Job myJob = createMockJob("user id");
871         myJob.setUuid(UUID.randomUUID());
872
873         broker.pushBack(myJob); //Should fail
874     }
875
876     @Test
877     public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
878         final ImmutableMap<String, Object> randomDataForMostRecentJobType =
879                 ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
880
881         waitForFutureJob(newJobAsync(broker));
882         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
883
884         job.setStatus(Job.JobStatus.PENDING);
885         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
886         job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
887         job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
888
889         broker.pushBack(job);
890         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
891
892         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
893         assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
894         assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
895     }
896
897     private static String jobDataReflected(Job job) {
898         return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
899                 .setExcludeFieldNames("created", "modified", "takenBy")
900                 .toString();
901     }
902
903     @Test(expectedExceptions = IllegalStateException.class)
904     public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
905         waitForFutureJob(newJobAsync(broker));
906         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
907
908         broker.pushBack(job);
909         broker.pushBack(job); //Should fail
910     }
911
912     @Test
913     public void addJob_PeekItById_verifySameJobWasPeeked() {
914         String userId = UUID.randomUUID().toString();
915         Job myJob = createMockJob(userId);
916         UUID uuid = broker.add(myJob);
917         Job peekedJob = broker.peek(uuid);
918         assertEquals("added testId is not the same as peeked TestsId",
919                 userId,
920                 peekedJob.getSharedData().getUserId());
921     }
922
923     @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
924     public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
925         final Job job = waitForFutureJob(newJobAsync(broker, status));
926         broker.delete(job.getUuid());
927         assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
928         waitForFutureOptionalJob(pullJobAsync(broker));
929     }
930
931     @DataProvider
932     public static Object[][] jobStatusesForSuccessDelete() {
933         return new Object[][]{
934                 {PENDING},
935                 {STOPPED}
936         };
937     }
938
939     @Test(
940             dataProvider = "jobStatusesForFailedDelete",
941             expectedExceptions = OperationNotAllowedException.class,
942             expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
943     )
944     public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
945         final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
946
947         if (taken) {
948             waitForFutureOptionalJob(pullJobAsync(broker));
949         }
950
951
952         broker.delete(job.getUuid());
953     }
954
955     @DataProvider
956     public static Object[][] jobStatusesForFailedDelete() {
957         return new Object[][]{
958                 {PENDING, true},
959                 {IN_PROGRESS, false},
960                 {COMPLETED, false},
961                 {PAUSE, false},
962                 {FAILED, false},
963         };
964     }
965
966     @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
967     public void deleteJob_notExist_exceptionIsThrown() {
968         waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
969         broker.delete(new UUID(111, 111));
970     }
971
972     public static class MockAsyncRequest implements AsyncJobRequest {
973         public String value;
974
975         public MockAsyncRequest() {}
976
977         public MockAsyncRequest(String value) {
978             this.value = value;
979         }
980
981         public String getValue() {
982             return value;
983         }
984     }
985
986     @Test
987     public void twoJobsWithSamePosition_bothJobsArePulled(){
988         UUID uuid = UUID.randomUUID();
989         int positionInBulk = RandomUtils.nextInt();
990         String userId = "userId";
991
992         Optional<Job> firstPulledJob = createAddAndPullJob(uuid, positionInBulk, userId, "first value");
993         Optional<Job> secondPulledJob = createAddAndPullJob(uuid, positionInBulk, userId, "second value");
994
995         MockAsyncRequest firstValue = (MockAsyncRequest) firstPulledJob.get().getSharedData().getRequest();
996         MockAsyncRequest secondValue = (MockAsyncRequest) secondPulledJob.get().getSharedData().getRequest();
997         assertThat(ImmutableList.of(firstValue.value, secondValue.value),
998             containsInAnyOrder("first value", "second value"));
999     }
1000
1001     private Optional<Job> createAddAndPullJob(UUID uuid, int positionInBulk, String userId, String s) {
1002         JobDaoImpl job1 = createNewJob(positionInBulk, uuid, userId, CREATING, null,
1003             LocalDateTime.now().minusSeconds(1), false);
1004         job1.setSharedData(new JobSharedData(null, userId, new MockAsyncRequest(s), "testApi"));
1005         broker.add(job1);
1006         return broker.pull(CREATING, userId);
1007     }
1008 }