Implant vid-app-common org.onap.vid.job (main and test)
[vid.git] / vid-app-common / src / test / java / org / onap / vid / services / JobsBrokerServiceTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.services;
22
23
24 import com.google.common.collect.ImmutableList;
25 import com.google.common.collect.ImmutableMap;
26 import org.apache.commons.lang.RandomStringUtils;
27 import org.apache.commons.lang3.RandomUtils;
28 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
29 import org.apache.commons.lang3.builder.ToStringStyle;
30 import org.apache.commons.lang3.tuple.Pair;
31 import org.apache.log4j.LogManager;
32 import org.apache.log4j.Logger;
33 import org.hibernate.SessionFactory;
34 import org.jetbrains.annotations.NotNull;
35 import org.mockito.Mock;
36 import org.mockito.MockitoAnnotations;
37 import org.onap.portalsdk.core.domain.support.DomainVo;
38 import org.onap.portalsdk.core.service.DataAccessService;
39 import org.onap.portalsdk.core.util.SystemProperties;
40 import org.onap.vid.exceptions.GenericUncheckedException;
41 import org.onap.vid.exceptions.OperationNotAllowedException;
42 import org.onap.vid.job.Job;
43 import org.onap.vid.job.JobAdapter;
44 import org.onap.vid.job.JobType;
45 import org.onap.vid.job.JobsBrokerService;
46 import org.onap.vid.job.command.JobCommandFactoryTest;
47 import org.onap.vid.job.impl.JobDaoImpl;
48 import org.onap.vid.job.impl.JobSchedulerInitializer;
49 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
50 import org.onap.vid.services.VersionService;
51 import org.onap.vid.utils.DaoUtils;
52 import org.onap.vid.config.DataSourceConfig;
53 import org.onap.vid.config.JobAdapterConfig;
54 import org.springframework.test.context.ContextConfiguration;
55 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
56 import org.testng.Assert;
57 import org.testng.annotations.AfterMethod;
58 import org.testng.annotations.BeforeMethod;
59 import org.testng.annotations.DataProvider;
60 import org.testng.annotations.Test;
61
62 import javax.inject.Inject;
63 import java.lang.reflect.Method;
64 import java.time.LocalDateTime;
65 import java.time.ZoneId;
66 import java.util.*;
67 import java.util.concurrent.*;
68 import java.util.stream.Collectors;
69 import java.util.stream.IntStream;
70 import java.util.stream.Stream;
71
72 import static java.util.concurrent.TimeUnit.MILLISECONDS;
73 import static java.util.stream.Collectors.toList;
74 import static org.hamcrest.CoreMatchers.equalTo;
75 import static org.hamcrest.CoreMatchers.is;
76 import static org.hamcrest.MatcherAssert.assertThat;
77 import static org.hamcrest.Matchers.both;
78 import static org.hamcrest.Matchers.containsInAnyOrder;
79 import static org.mockito.Mockito.when;
80 import static org.onap.vid.job.Job.JobStatus.*;
81 import static org.onap.vid.utils.Streams.not;
82 import static org.onap.vid.testUtils.TestUtils.generateRandomAlphaNumeric;
83 import static org.testng.Assert.assertNotNull;
84 import static org.testng.AssertJUnit.assertEquals;
85 import static org.testng.AssertJUnit.assertFalse;
86
87 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
88 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
89
90     private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
91
92     private static final int JOBS_COUNT = 127;
93     private static final boolean DELETED = true;
94     private final ExecutorService executor = Executors.newFixedThreadPool(90);
95
96     private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
97
98     private final long FEW = 500;
99
100     private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
101     private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
102     private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
103     private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
104     private JobsBrokerService broker;
105
106     @Inject
107     JobAdapter jobAdapter;
108     @Inject
109     private DataAccessService dataAccessService;
110     @Inject
111     private SessionFactory sessionFactory;
112
113     @Mock
114     private VersionService versionService;
115
116     @AfterMethod
117     public void threadsCounter() {
118         logger.info("participating threads count: " + threadsIds.size());
119         threadsIds.clear();
120     }
121
122     @BeforeMethod
123     public void initializeBroker() {
124         MockitoAnnotations.initMocks(this);
125         when(versionService.retrieveBuildNumber()).thenReturn("aBuildNumber");
126         broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0, versionService);
127         ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
128     }
129
130     /*
131     - pulling jobs is limited to inserted ones
132     - putting back allows getting the job again
133     - multi threads safety
134     - any added job should be visible with view
135
136     - edges:
137         - pulling with empty repo should return empty optional
138         - pulling more than expected should return empty optional
139         - putting one, over-pulling from a different thread
140         - take before inserting, then insert while waiting
141
142      */
143
144     private class NoJobException extends RuntimeException {
145     }
146
147     private Future<Job> newJobAsync(JobsBrokerService b) {
148         return newJobAsync(b, createMockJob("user id"));
149     }
150
151     private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
152         return newJobAsync(b, createMockJob("user id", status));
153     }
154
155     private Job createMockJob(String userId) {
156         return jobAdapter.createServiceInstantiationJob(
157                 JobType.NoOp,
158                 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
159                 UUID.randomUUID(),
160                 userId,
161                 null,
162                 "optimisticUniqueServiceInstanceName",
163                 RandomUtils.nextInt());
164     }
165
166     private Job createMockJob(String userId, Job.JobStatus jobStatus) {
167         Job job = createMockJob(userId);
168         job.setStatus(jobStatus);
169         return job;
170     }
171
172     private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
173         final Future<Job> jobFuture = executor.submit(() -> {
174             accountThreadId();
175
176             b.add(job);
177
178             return job;
179         });
180         return jobFuture;
181     }
182
183     private void pushBackJobAsync(JobsBrokerService b, Job job) {
184         executor.submit(() -> {
185             accountThreadId();
186             b.pushBack(job);
187             return job;
188         });
189     }
190
191     private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
192         final Future<Optional<Job>> job = executor.submit(() -> {
193             accountThreadId();
194             // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
195             return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
196         });
197         return job;
198     }
199
200     private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
201         try {
202             return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
203         } catch (TimeoutException | InterruptedException | ExecutionException e) {
204             throw new RuntimeException(e);
205         }
206     }
207
208     private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
209         try {
210             return retrievedJobFuture.get(FEW, MILLISECONDS);
211         } catch (TimeoutException | InterruptedException | ExecutionException e) {
212             throw new RuntimeException(e);
213         }
214     }
215
216     private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
217         final List<Job> originalJobs = putALotOfJobs(broker);
218         final List<Job> retrievedJobs = getAlotOfJobs(broker);
219
220         assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
221
222         return retrievedJobs;
223     }
224
225     private List<Job> putALotOfJobs(JobsBrokerService broker) {
226         int n = JOBS_COUNT;
227         return IntStream.range(0, n)
228                 .mapToObj(i -> newJobAsync(broker))
229                 .map(this::waitForFutureJob)
230                 .collect(toList());
231     }
232
233     private List<Job> getAlotOfJobs(JobsBrokerService broker) {
234         int n = JOBS_COUNT;
235         return IntStream.range(0, n)
236                 .mapToObj(i -> pullJobAsync(broker))
237                 .map(this::waitForFutureOptionalJob)
238                 .collect(toList());
239     }
240
241     private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
242         jobs.forEach(job -> pushBackJobAsync(broker, job));
243     }
244
245     private void accountThreadId() {
246         threadsIds.add(Thread.currentThread().getId());
247     }
248
249     @Test
250     public void givenSingleJob_getIt_verifySameJob() {
251         final Job originalJob = waitForFutureJob(newJobAsync(broker));
252
253         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
254         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
255     }
256
257     @DataProvider
258     public static Object[][] allTopics() {
259         return JobSchedulerInitializer.WORKERS_TOPICS.stream()
260                 .map(topic -> new Object[] { topic })
261                 .toArray(Object[][]::new);
262     }
263
264     @Test(dataProvider = "allTopics")
265     public void givenJobFromSameBuild_pullJobs_jobIsPulled(Job.JobStatus topic) {
266         when(versionService.retrieveBuildNumber()).thenReturn("someVersion");
267         Job mockedJob = createMockJob("user id", topic);
268         UUID uuid = broker.add(mockedJob);
269         assertEquals(uuid,  broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
270     }
271
272
273     @Test(dataProvider = "allTopics")
274     public void givenJobFromOtherBuild_pullJobs_noneIsPulled(Job.JobStatus topic) {
275         when(versionService.retrieveBuildNumber()).thenReturn("old");
276         Job mockedJob = createMockJob("user id", topic);
277         broker.add(mockedJob);
278         when(versionService.retrieveBuildNumber()).thenReturn("new");
279         assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
280     }
281
282     @Test
283     public void givenJobWithNullBuildAndJobWithRealBuild_pullJobs_jobsWithNonNullIsPulled() {
284         Job.JobStatus topic = PENDING;
285
286         //push job with null build
287         when(versionService.retrieveBuildNumber()).thenReturn(null);
288         broker.add(createMockJob("user id", topic));
289
290         //push job with "aBuild" build
291         when(versionService.retrieveBuildNumber()).thenReturn("aBuild");
292         UUID newJobId = broker.add(createMockJob("user id", topic));
293
294         //pull jobs while current build is still "aBuild". Only the non null build is pulled
295         assertEquals(newJobId,  broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
296
297         //no more jobs to pull
298         assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
299     }
300
301
302     @Test
303     public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
304         final List<Job> originalJobs = putALotOfJobs(broker);
305
306         MILLISECONDS.sleep(FEW);
307         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
308
309         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
310
311         MILLISECONDS.sleep(FEW);
312         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
313
314         pushBackJobAsync(broker, retrievedJob);
315
316         MILLISECONDS.sleep(FEW);
317         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
318     }
319
320     @Test
321     public void givenManyJobs_getThemAll_verifySameJobs() {
322         putAndGetALotOfJobs(broker);
323     }
324
325     @Test
326     public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
327         final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
328
329         pushBackJobs(retrievedJobs1, broker);
330         final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
331
332         assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
333     }
334
335     private static Date toDate(LocalDateTime localDateTime) {
336         return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
337     }
338
339     private void setModifiedDateToJob(UUID jobUuid, Date date) {
340         DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
341         job.setModified(date);
342         DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
343             session.saveOrUpdate(job);
344             return 1;
345         });
346     }
347
348
349     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
350         return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
351     }
352
353     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
354         JobDaoImpl job = new JobDaoImpl();
355         job.setUuid(UUID.randomUUID());
356         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
357         job.setIndexInBulk(indexInBulk);
358         job.setTemplateId(templateId);
359         job.setType(JobType.NoOp);
360         job.setStatus(status);
361         job.setTakenBy(takenBy);
362         job.setCreated(toDate(date));
363         job.setModified(toDate(date));
364         job.setUserId(userId);
365         if (deleted) {
366             job.setDeletedAt(new Date());
367         }
368         return job;
369     }
370
371     @DataProvider
372     public static Object[][] jobs(Method test) {
373         LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
374         UUID sameTemplate = UUID.randomUUID();
375         return new Object[][]{
376                 {ImmutableList.of(
377                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
378                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
379                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
380                         () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
381                         4,
382                         0,
383                         PENDING,
384                         "Broker should pull the first pending job by oldest date then by job index"
385                 },
386                 { ImmutableList.of(
387                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
388                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
389                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
390                         () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
391                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
392                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
393                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
394                         6,
395                         5,
396                         PENDING,
397                         "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
398                 },
399                 {ImmutableList.of(
400                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
401                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
402                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
403                         2,
404                         -1,
405                         PENDING,
406                         "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
407                 },
408                 {ImmutableList.of(
409                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
410                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
411                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
412                         2,
413                         -1,
414                         PENDING,
415                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
416                 },
417                 {ImmutableList.of(
418                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
419                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
420                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
421                         () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
422                 ),
423                         3,
424                         2,
425                         PENDING,
426                         "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
427                 },
428                 {ImmutableList.of(
429                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
430                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
431                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
432                         3,
433                         -1,
434                         PENDING,
435                         "Broker should not pull any job when there is another job from this template that was taken"
436                 },
437                 {ImmutableList.of(
438                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
439                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
440                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
441                         3,
442                         -1,
443                         PENDING,
444                         "Broker should not pull any job when there is another job from this template that in progress"
445                 },
446                 {ImmutableList.of(
447                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
448                         () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
449                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
450                         3,
451                         -1,
452                         PENDING,
453                         "Broker should not pull any job when there is another job from this template that was failed"
454                 },
455                 {ImmutableList.of(
456                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
457                         () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
458                         () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
459                         3,
460                         2,
461                         PENDING,
462                         "Broker should pull pending job when there is another job from this template that was deleted, although failed"
463                 },
464                 { ImmutableList.of(
465                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
466                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
467                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
468                         3,
469                         2,
470                         PENDING,
471                         "Broker should prioritize jobs of user that has no in-progress jobs"
472                 },
473                 {ImmutableList.of(
474                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
475                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
476                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
477                         3,
478                         2,
479                         PENDING,
480                         "Broker should prioritize jobs of user that has no taken jobs"
481                 },
482                 {ImmutableList.of(
483                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
484                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
485                         () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
486                         () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
487                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
488                         5,
489                         4,
490                         PENDING,
491                         "Broker should take oldest job when there is one in-progress job to each user"
492                 },
493                 {ImmutableList.of(
494                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
495                         () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
496                         () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
497                         2,
498                         -1,
499                         PENDING,
500                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
501                 },
502                 {ImmutableList.of(
503                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
504                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
505                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
506                         () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
507                         20,
508                         1,
509                         IN_PROGRESS,
510                         "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
511                 },
512                 {ImmutableList.of(
513                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
514                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
515                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
516                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
517                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
518                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
519                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
520                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
521                         () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
522                 ),
523                         20,
524                         6,
525                         IN_PROGRESS,
526                         "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
527                 },
528                 {ImmutableList.of(
529                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
530                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
531                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
532                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
533                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
534                         () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
535                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
536                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
537                         () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
538                 ),
539                         20,
540                         6,
541                         RESOURCE_IN_PROGRESS,
542                         "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
543                 },
544                 {ImmutableList.of(
545                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
546                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
547                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
548                         20,
549                         -1,
550                         IN_PROGRESS,
551                         "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
552                 },
553                 {ImmutableList.of(
554                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
555                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
556                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
557                         20,
558                         -1,
559                         RESOURCE_IN_PROGRESS,
560                         "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
561                 },
562                 {ImmutableList.of(
563                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
564                         () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
565                         () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
566                         1,
567                         2,
568                         CREATING,
569                         "Broker with creating topic should pull oldest creating job and ignore mso limit"
570                 },
571                 {ImmutableList.of(
572                         (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
573                         1,
574                         0,
575                         CREATING,
576                         "Broker with CREATING topic should pull CREATING job that was just modified"
577                 }
578
579         };
580     }
581
582     public interface Jobber {
583         // Will defer LocalDateTime.now() to test's "real-time"
584         JobDaoImpl toJob();
585     }
586
587     @Test(dataProvider = "jobs")
588     public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
589         JobsBrokerServiceInDatabaseImpl aBroker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20, versionService);
590         final List<JobDaoImpl> jobs = addJobsWithModifiedDate(jobbers, aBroker);
591         Optional<Job> nextJob = aBroker.pull(topic, UUID.randomUUID().toString());
592         boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
593         String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
594         Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
595         if (shouldAnyBeSelected) {
596             Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
597         }
598     }
599
600     @NotNull
601     protected List<JobDaoImpl> addJobsWithModifiedDate(List<Jobber> jobbers, JobsBrokerService broker) {
602         final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
603         return addJobsWithModifiedDateByJobDao(jobs, broker);
604     }
605
606     @NotNull
607     private List<JobDaoImpl> addJobsWithModifiedDateByJobDao(List<JobDaoImpl> jobs, JobsBrokerService broker) {
608         for (JobDaoImpl job : jobs) {
609             Date modifiedDate = job.getModified();
610             broker.add(job);
611             setModifiedDateToJob(job.getUuid(), modifiedDate);
612         }
613         return jobs;
614     }
615
616     @DataProvider
617     public static Object[][] jobsForTestingPendingResource(Method test) {
618         UUID templateId1 = UUID.fromString("311a9196-bbc5-47a1-8b11-bf0f9db1c7ca");
619         UUID templateId2 = UUID.fromString("4f1522f9-642e-49f7-af75-a2f344085bcc");
620         return new Object[][]{
621                 {ImmutableList.of( (Jobber)
622                                 () -> createNewJob(12, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
623                         () -> createNewJob(1, templateId2, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
624                         () -> createNewJob(2, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
625                         () -> createNewJob(3, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
626                         () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false)
627                 ),
628                         0,
629                         "given there is only one in the queue in PENDING_RESOURCE and no other job with same templateId, then this job is selected"
630                 },
631                 {ImmutableList.of( (Jobber)
632                                 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
633                         () -> createNewJob(3, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(2), false),
634                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
635                 ),
636                         2,
637                         "given multiple jobs with same templateId in PENDING_RESOURCE, then job with lowest indexInBulk is selected"
638                 },
639                 {ImmutableList.of( (Jobber)
640                                 () -> createNewJob(1, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
641                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
642                 ),
643                         1,
644                         "given multiple jobs with same indexInBulk, then job with lowest templateId is selected"
645                 },
646                 {ImmutableList.of( (Jobber)
647                                 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
648                         () -> createNewJob(2, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false)
649                 ),
650                         0,
651                         "given multiple jobs with different indexInBulk and different templateId, then job with lowest indexInBulk is selected"
652                 },
653                 {ImmutableList.of( (Jobber)
654                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
655                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false)
656                 ),
657                         -1,
658                         "given there is already taken job with same templateId, then no job is selected"
659                 },
660                 {ImmutableList.of( (Jobber)
661                                 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
662                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false),
663                         () -> createNewJob(9, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
664                         () -> createNewJob(8, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
665                 ),
666                         3,
667                         "given 4 jobs, 2 jobs templateId1 but one of them is taken, and 2 jobs with templateId2, then select job with templateId2"
668                 },
669                 {ImmutableList.of( (Jobber)
670                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
671                         () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), true)
672                 ),
673                         0,
674                         "given 2 jobs with same templateId, one of them is taken but deleted, then the other job is selected"
675                 },
676                 {ImmutableList.of( (Jobber)
677                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
678                         () -> createNewJob(1, templateId1, "userId", IN_PROGRESS, null, LocalDateTime.now(), false)
679                 ),
680                         -1,
681                         "given 2 jobs with same templateId, one of them is IN_PROGRESS, then no job is selected"
682                 },
683                 {ImmutableList.of( (Jobber)
684                                 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
685                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false)
686                 ),
687                         -1,
688                         "given 2 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS, then no job is selected"
689                 },
690                 {ImmutableList.of( (Jobber)
691                                 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
692                         () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
693                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), true)
694                 ),
695                         1,
696                         "given 3 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS but deleted, then other job with lowest indexInBulk is selected"
697                 },
698                 {ImmutableList.of( (Jobber)
699                                 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
700                         () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
701                         () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false),
702                         () -> createNewJob(12, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
703                         () -> createNewJob(11, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
704                 ),
705                         4,
706                         "given 5 jobs, 3 with templateId1 that one of them is RESOURCE_IN_PROGRESS,"+
707                                 "2 with templateId2 both in PENDING_RESOURCE, then job with lowest indexInBulk from templateId2 is selected"
708
709                 },
710                 {ImmutableList.of( (Jobber)
711                         () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), true)
712                 ),
713                         -1,
714                         "given 1 job in PENDING_RESOURCE but it's deleted, then no job is selected"
715                 },
716                 {ImmutableList.of( (Jobber)
717                                 () -> createNewJob(20, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
718                         () -> createNewJob(1, templateId1, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
719                         () -> createNewJob(2, templateId1, "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
720                         () -> createNewJob(3, templateId1, "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
721                         () -> createNewJob(4, templateId1, "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
722                         () -> createNewJob(5, templateId1, "userId", STOPPED, null, LocalDateTime.now().minusSeconds(1), false),
723                         () -> createNewJob(6, templateId1, "userId", PAUSE, null, LocalDateTime.now().minusSeconds(1), false)
724                 ),
725                         0,
726                         "given multiple jobs with same templateId, 1 in PENDING_RESOURCE, and other are not in progress, "+
727                                 "then the job in PENDING_RESOURCE is selected"
728                 },
729                 {ImmutableList.of( (Jobber)
730                                 () -> createNewJob(1, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
731                         () -> createNewJob(2, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
732                         () -> createNewJob(3, UUID.randomUUID(), "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
733                         () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
734                         () -> createNewJob(5, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
735                         () -> createNewJob(6, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false)
736                 ),
737                         -1,
738                         "given there is no job in PENDING_RESOURCE state, then no job is selected"
739                 },
740                 {ImmutableList.of( (Jobber)
741                         () -> createNewJob(6, null, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false)
742                 ),
743                         -1,
744                         "given there is 1 job in PENDING_RESOURCE but without templateId, then no job is selected"
745                 },
746         };
747     }
748
749     @Test(dataProvider = "jobsForTestingPendingResource")
750     public void givenSomeJobs_pullPendingResource_returnNextOrNothingAsExpected(List<Jobber> jobbers, int expectedIndexSelected, String assertionReason) {
751         givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(jobbers, 1, expectedIndexSelected, PENDING_RESOURCE, assertionReason);
752     }
753
754     public static JobDaoImpl createNewJob(Job.JobStatus status, String takenBy, long secondsOffset, boolean deleted) {
755         return createNewJob(1, UUID.randomUUID(), "af456", status, takenBy, LocalDateTime.now().minusSeconds(secondsOffset), deleted);
756     }
757
758     @Test
759     public void givenSomeJobs_deleteOldFinalJobs_onlyExpectedJobsAreDeleted() {
760         long seconds = 999;
761         final List<Pair<JobDaoImpl,Boolean>> jobs = ImmutableList.of(
762                 //not final
763                 Pair.of(createNewJob(IN_PROGRESS, null, seconds+1, false), true),
764                 Pair.of(createNewJob(RESOURCE_IN_PROGRESS, null, seconds+1, false), true),
765                 Pair.of(createNewJob(PENDING, null, seconds+1, false), true),
766                 Pair.of(createNewJob(CREATING, null, seconds+1, false), true),
767                 Pair.of(createNewJob(PENDING_RESOURCE, null, seconds+1, false), true),
768                 Pair.of(createNewJob(PAUSE, null, seconds+1, false), true),
769
770                 //final
771                 Pair.of(createNewJob(COMPLETED, null, seconds+1, false), false),
772                 Pair.of(createNewJob(FAILED, null, seconds+1, false), false),
773                 Pair.of(createNewJob(STOPPED, null, seconds+1, false), false),
774                 Pair.of(createNewJob(COMPLETED_WITH_ERRORS, null, seconds+1, true), false),
775                 Pair.of(createNewJob(COMPLETED_WITH_NO_ACTION, generateRandomAlphaNumeric(5), seconds+1, true), false),
776
777                 //final but not old
778                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-2, false), true),
779                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-400, false), true),
780                 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), 0, false), true)
781         );
782         addJobsWithModifiedDateByJobDao(jobs.stream().map(Pair::getLeft).collect(Collectors.toList()), broker);
783         assertEquals(jobs.size(), broker.peek().size());
784
785         broker.deleteOldFinalJobs(seconds);
786         Stream<Pair<UUID, Job.JobStatus>> expectedJobs = jobs.stream()
787                 .filter(Pair::getRight)
788                 .map(x -> Pair.of(
789                         x.getLeft().getUuid(),
790                         x.getLeft().getStatus()
791                 ));
792         assertThat(broker.peek().stream().map(x->Pair.of(x.getUuid(), x.getStatus())).collect(Collectors.toList()),
793                 containsInAnyOrder(expectedJobs.toArray()));
794     }
795
796     @DataProvider
797     public Object[][] topics() {
798         return Arrays.stream(Job.JobStatus.values())
799                 .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS, PENDING_RESOURCE).contains(t)))
800                 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
801     }
802
803     @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
804     public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
805         broker.pull(topic, UUID.randomUUID().toString());
806     }
807
808     @Test(expectedExceptions = NoJobException.class)
809     public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
810         Stream.of(Job.JobStatus.values())
811                 .filter(not(s -> s.equals(PENDING)))
812                 .map(s -> createMockJob("some user id", s))
813                 .map(job -> newJobAsync(broker, job))
814                 .map(this::waitForFutureJob)
815                 .collect(toList());
816
817         waitForFutureOptionalJob(pullJobAsync(broker));
818     }
819
820     @Test
821     public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
822         newJobAsync(broker); // this negated the expected result of the call below
823         givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
824     }
825
826     @Test(expectedExceptions = NoJobException.class)
827     public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
828         putAndGetALotOfJobs(broker);
829         waitForFutureOptionalJob(pullJobAsync(broker));
830     }
831
832     @Test(expectedExceptions = NoJobException.class)
833     public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
834         final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
835         assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
836         waitForFutureOptionalJob(futureOptionalJob);
837     }
838
839     @Test(expectedExceptions = IllegalStateException.class)
840     public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
841         waitForFutureJob(newJobAsync(broker));
842         waitForFutureJob(newJobAsync(broker));
843         waitForFutureOptionalJob(pullJobAsync(broker));
844
845         Job myJob = createMockJob("user id");
846         myJob.setUuid(UUID.randomUUID());
847
848         broker.pushBack(myJob); //Should fail
849     }
850
851     @Test
852     public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
853         final ImmutableMap<String, Object> randomDataForMostRecentJobType =
854                 ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
855
856         waitForFutureJob(newJobAsync(broker));
857         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
858
859         job.setStatus(Job.JobStatus.PENDING);
860         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
861         job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
862         job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
863
864         broker.pushBack(job);
865         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
866
867         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
868         assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
869         assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
870     }
871
872     private static String jobDataReflected(Job job) {
873         return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
874                 .setExcludeFieldNames("created", "modified", "takenBy")
875                 .toString();
876     }
877
878     @Test(expectedExceptions = IllegalStateException.class)
879     public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
880         waitForFutureJob(newJobAsync(broker));
881         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
882
883         broker.pushBack(job);
884         broker.pushBack(job); //Should fail
885     }
886
887     @Test
888     public void addJob_PeekItById_verifySameJobWasPeeked() {
889         String userId = UUID.randomUUID().toString();
890         Job myJob = createMockJob(userId);
891         UUID uuid = broker.add(myJob);
892         Job peekedJob = broker.peek(uuid);
893         assertEquals("added testId is not the same as peeked TestsId",
894                 userId,
895                 peekedJob.getSharedData().getUserId());
896     }
897
898     @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
899     public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
900         final Job job = waitForFutureJob(newJobAsync(broker, status));
901         broker.delete(job.getUuid());
902         assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
903         waitForFutureOptionalJob(pullJobAsync(broker));
904     }
905
906     @DataProvider
907     public static Object[][] jobStatusesForSuccessDelete() {
908         return new Object[][]{
909                 {PENDING},
910                 {STOPPED}
911         };
912     }
913
914     @Test(
915             dataProvider = "jobStatusesForFailedDelete",
916             expectedExceptions = OperationNotAllowedException.class,
917             expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
918     )
919     public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
920         final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
921
922         if (taken) {
923             waitForFutureOptionalJob(pullJobAsync(broker));
924         }
925
926
927         broker.delete(job.getUuid());
928     }
929
930     @DataProvider
931     public static Object[][] jobStatusesForFailedDelete() {
932         return new Object[][]{
933                 {PENDING, true},
934                 {IN_PROGRESS, false},
935                 {COMPLETED, false},
936                 {PAUSE, false},
937                 {FAILED, false},
938         };
939     }
940
941     @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
942     public void deleteJob_notExist_exceptionIsThrown() {
943         waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
944         broker.delete(new UUID(111, 111));
945     }
946
947 }