Merge from ECOMP's repository
[vid.git] / vid-app-common / src / test / java / org / onap / vid / services / JobsBrokerServiceTest.java
1 package org.onap.vid.services;
2
3
4 import com.google.common.collect.ImmutableList;
5 import com.google.common.collect.ImmutableMap;
6 import org.apache.commons.lang.RandomStringUtils;
7 import org.apache.commons.lang3.RandomUtils;
8 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
9 import org.apache.commons.lang3.builder.ToStringStyle;
10 import org.apache.log4j.LogManager;
11 import org.apache.log4j.Logger;
12 import org.hibernate.SessionFactory;
13 import org.onap.portalsdk.core.domain.support.DomainVo;
14 import org.onap.portalsdk.core.service.DataAccessService;
15 import org.onap.portalsdk.core.util.SystemProperties;
16 import org.onap.vid.config.DataSourceConfig;
17 import org.onap.vid.config.JobAdapterConfig;
18 import org.onap.vid.exceptions.GenericUncheckedException;
19 import org.onap.vid.exceptions.OperationNotAllowedException;
20 import org.onap.vid.job.Job;
21 import org.onap.vid.job.JobAdapter;
22 import org.onap.vid.job.JobType;
23 import org.onap.vid.job.JobsBrokerService;
24 import org.onap.vid.job.command.JobCommandFactoryTest;
25 import org.onap.vid.job.impl.JobDaoImpl;
26 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
27 import org.onap.vid.utils.DaoUtils;
28 import org.springframework.test.context.ContextConfiguration;
29 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
30 import org.testng.Assert;
31 import org.testng.annotations.AfterMethod;
32 import org.testng.annotations.BeforeMethod;
33 import org.testng.annotations.DataProvider;
34 import org.testng.annotations.Test;
35
36 import javax.inject.Inject;
37 import java.lang.reflect.Method;
38 import java.time.LocalDateTime;
39 import java.time.ZoneId;
40 import java.util.*;
41 import java.util.concurrent.*;
42 import java.util.stream.IntStream;
43 import java.util.stream.Stream;
44
45 import static java.util.concurrent.TimeUnit.MILLISECONDS;
46 import static java.util.stream.Collectors.toList;
47 import static org.hamcrest.CoreMatchers.equalTo;
48 import static org.hamcrest.CoreMatchers.is;
49 import static org.hamcrest.MatcherAssert.assertThat;
50 import static org.hamcrest.Matchers.both;
51 import static org.hamcrest.Matchers.containsInAnyOrder;
52 import static org.onap.vid.job.Job.JobStatus.*;
53 import static org.onap.vid.utils.Streams.not;
54 import static org.testng.Assert.assertNotNull;
55 import static org.testng.AssertJUnit.assertEquals;
56
57 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
58 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
59
60     private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
61
62     private static final int JOBS_COUNT = 127;
63     private static final boolean DELETED = true;
64     private final ExecutorService executor = Executors.newFixedThreadPool(90);
65
66     private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
67
68     private final long FEW = 500;
69
70     private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
71     private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
72     private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
73     private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
74     private JobsBrokerService broker;
75
76     @Inject
77     JobAdapter jobAdapter;
78     @Inject
79     private DataAccessService dataAccessService;
80     @Inject
81     private SessionFactory sessionFactory;
82
83     /*
84     - pulling jobs is limited to inserted ones
85     - putting back allows getting the job again
86     - multi threads safety
87     - any added job should be visible with view
88
89     - edges:
90         - pulling with empty repo should return empty optional
91         - pulling more than expected should return empty optional
92         - putting one, over-pulling from a different thread
93         - take before inserting, then insert while waiting
94
95      */
96
97     private class NoJobException extends RuntimeException {
98     }
99
100     private Future<Job> newJobAsync(JobsBrokerService b) {
101         return newJobAsync(b, createMockJob("user id"));
102     }
103
104     private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
105         return newJobAsync(b, createMockJob("user id", status));
106     }
107
108     private Job createMockJob(String userId) {
109         return jobAdapter.createServiceInstantiationJob(
110                 JobType.NoOp,
111                 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
112                 UUID.randomUUID(),
113                 userId,
114                 "optimisticUniqueServiceInstanceName",
115                 RandomUtils.nextInt());
116     }
117
118     private Job createMockJob(String userId, Job.JobStatus jobStatus) {
119         Job job = createMockJob(userId);
120         job.setStatus(jobStatus);
121         return job;
122     }
123
124     private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
125         final Future<Job> jobFuture = executor.submit(() -> {
126             accountThreadId();
127
128             b.add(job);
129
130             return job;
131         });
132         return jobFuture;
133     }
134
135     private void pushBackJobAsync(JobsBrokerService b, Job job) {
136         executor.submit(() -> {
137             accountThreadId();
138             b.pushBack(job);
139             return job;
140         });
141     }
142
143     private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
144         final Future<Optional<Job>> job = executor.submit(() -> {
145             accountThreadId();
146             // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
147             return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
148         });
149         return job;
150     }
151
152     private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
153         try {
154             return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
155         } catch (TimeoutException | InterruptedException | ExecutionException e) {
156             throw new RuntimeException(e);
157         }
158     }
159
160     private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
161         try {
162             return retrievedJobFuture.get(FEW, MILLISECONDS);
163         } catch (TimeoutException | InterruptedException | ExecutionException e) {
164             throw new RuntimeException(e);
165         }
166     }
167
168     private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
169         final List<Job> originalJobs = putALotOfJobs(broker);
170         final List<Job> retrievedJobs = getAlotOfJobs(broker);
171
172         assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
173
174         return retrievedJobs;
175     }
176
177     private List<Job> putALotOfJobs(JobsBrokerService broker) {
178         int n = JOBS_COUNT;
179         return IntStream.range(0, n)
180                 .mapToObj(i -> newJobAsync(broker))
181                 .map(this::waitForFutureJob)
182                 .collect(toList());
183     }
184
185     private List<Job> getAlotOfJobs(JobsBrokerService broker) {
186         int n = JOBS_COUNT;
187         return IntStream.range(0, n)
188                 .mapToObj(i -> pullJobAsync(broker))
189                 .map(this::waitForFutureOptionalJob)
190                 .collect(toList());
191     }
192
193     private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
194         jobs.forEach(job -> pushBackJobAsync(broker, job));
195     }
196
197     private void accountThreadId() {
198         threadsIds.add(Thread.currentThread().getId());
199     }
200
201     @AfterMethod
202     public void threadsCounter() {
203         logger.info("participating threads count: " + threadsIds.size());
204         threadsIds.clear();
205     }
206
207     @BeforeMethod
208     public void initializeBroker() {
209         broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0);
210         ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
211     }
212
213     @Test(enabled = false)
214     public void givenSingleJob_getIt_verifySameJob() {
215         final Job originalJob = waitForFutureJob(newJobAsync(broker));
216
217         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
218         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
219     }
220
221     @Test(enabled = false)
222     public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
223         final List<Job> originalJobs = putALotOfJobs(broker);
224
225         MILLISECONDS.sleep(FEW);
226         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
227
228         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
229
230         MILLISECONDS.sleep(FEW);
231         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
232
233         pushBackJobAsync(broker, retrievedJob);
234
235         MILLISECONDS.sleep(FEW);
236         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
237     }
238
239     @Test(enabled = false)
240     public void givenManyJobs_getThemAll_verifySameJobs() {
241         putAndGetALotOfJobs(broker);
242     }
243
244     @Test(enabled = false)
245     public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
246         final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
247
248         pushBackJobs(retrievedJobs1, broker);
249         final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
250
251         assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
252     }
253
254     private static Date toDate(LocalDateTime localDateTime) {
255         return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
256     }
257
258     private void setModifiedDateToJob(UUID jobUuid, Date date) {
259         DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
260         job.setModified(date);
261         DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
262             session.saveOrUpdate(job);
263             return 1;
264         });
265     }
266
267
268     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
269         return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
270     }
271
272     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
273         JobDaoImpl job = new JobDaoImpl();
274         job.setUuid(UUID.randomUUID());
275         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
276         job.setIndexInBulk(indexInBulk);
277         job.setTemplateId(templateId);
278         job.setType(JobType.NoOp);
279         job.setStatus(status);
280         job.setTakenBy(takenBy);
281         job.setCreated(toDate(date));
282         job.setModified(toDate(date));
283         job.setUserId(userId);
284         if (deleted) {
285             job.setDeletedAt(new Date());
286         }
287         return job;
288     }
289
290     @DataProvider
291     public static Object[][] jobs(Method test) {
292         LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
293         UUID sameTemplate = UUID.randomUUID();
294         return new Object[][]{
295                 {ImmutableList.of(
296                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
297                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
298                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
299                         () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
300                         4,
301                         0,
302                         PENDING,
303                         "Broker should pull the first pending job by oldest date then by job index"
304                 },
305                 { ImmutableList.of(
306                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
307                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
308                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
309                         () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
310                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
311                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
312                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
313                   6,
314                   5,
315                   PENDING,
316                   "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
317                 },
318                 {ImmutableList.of(
319                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
320                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
321                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
322                         2,
323                         -1,
324                         PENDING,
325                         "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
326                 },
327                 {ImmutableList.of(
328                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
329                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
330                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
331                         2,
332                         -1,
333                         PENDING,
334                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
335                 },
336                 {ImmutableList.of(
337                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
338                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
339                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
340                         () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
341                         ),
342                         3,
343                         2,
344                         PENDING,
345                         "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
346                 },
347                 {ImmutableList.of(
348                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
349                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
350                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
351                         3,
352                         -1,
353                         PENDING,
354                         "Broker should not pull any job when there is another job from this template that was taken"
355                 },
356                 {ImmutableList.of(
357                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
358                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
359                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
360                         3,
361                         -1,
362                         PENDING,
363                         "Broker should not pull any job when there is another job from this template that in progress"
364                 },
365                 {ImmutableList.of(
366                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
367                         () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
368                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
369                         3,
370                         -1,
371                         PENDING,
372                         "Broker should not pull any job when there is another job from this template that was failed"
373                 },
374                 {ImmutableList.of(
375                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
376                         () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
377                         () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
378                    3,
379                    2,
380                    PENDING,
381                    "Broker should pull pending job when there is another job from this template that was deleted, although failed"
382                 },
383                 { ImmutableList.of(
384                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
385                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
386                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
387                         3,
388                         2,
389                         PENDING,
390                         "Broker should prioritize jobs of user that has no in-progress jobs"
391                 },
392                 {ImmutableList.of(
393                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
394                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
395                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
396                         3,
397                         2,
398                         PENDING,
399                         "Broker should prioritize jobs of user that has no taken jobs"
400                 },
401                 {ImmutableList.of(
402                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
403                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
404                         () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
405                         () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
406                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
407                         5,
408                         4,
409                         PENDING,
410                         "Broker should take oldest job when there is one in-progress job to each user"
411                 },
412                 {ImmutableList.of(
413                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
414                         () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
415                         () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
416                         2,
417                         -1,
418                         PENDING,
419                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
420                 },
421                 {ImmutableList.of(
422                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
423                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
424                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
425                         () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
426                         20,
427                         1,
428                         IN_PROGRESS,
429                         "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
430                 },
431                 {ImmutableList.of(
432                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
433                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
434                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
435                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
436                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
437                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
438                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
439                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
440                         () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
441                 ),
442                   20,
443                   6,
444                   IN_PROGRESS,
445                   "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
446                 },
447                 {ImmutableList.of(
448                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
449                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
450                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
451                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
452                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
453                         () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
454                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
455                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
456                         () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
457                 ),
458                         20,
459                         6,
460                         RESOURCE_IN_PROGRESS,
461                         "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
462                 },
463                 {ImmutableList.of(
464                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
465                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
466                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
467                         20,
468                         -1,
469                         IN_PROGRESS,
470                         "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
471                 },
472                 {ImmutableList.of(
473                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
474                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
475                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
476                         20,
477                         -1,
478                         RESOURCE_IN_PROGRESS,
479                         "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
480                 },
481                 {ImmutableList.of(
482                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
483                         () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
484                         () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
485                         1,
486                         2,
487                         CREATING,
488                         "Broker with creating topic should pull oldest creating job and ignore mso limit"
489                 },
490                 {ImmutableList.of(
491                         (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
492                         1,
493                         0,
494                         CREATING,
495                         "Broker with CREATING topic should pull CREATING job that was just modified"
496                 }
497
498         };
499     }
500
501     public interface Jobber {
502         // Will defer LocalDateTime.now() to test's "real-time"
503         JobDaoImpl toJob();
504     }
505
506     @Test(enabled = false, dataProvider = "jobs")
507     public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
508         JobsBrokerServiceInDatabaseImpl broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20);
509         final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
510         for (JobDaoImpl job : jobs) {
511             Date modifiedDate = job.getModified();
512             broker.add(job);
513             setModifiedDateToJob(job.getUuid(), modifiedDate);
514         }
515         Optional<Job> nextJob = broker.pull(topic, UUID.randomUUID().toString());
516         boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
517         String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
518         Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
519         if (shouldAnyBeSelected) {
520             Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
521         }
522     }
523
524     @DataProvider
525     public Object[][] topics() {
526         return Arrays.stream(Job.JobStatus.values())
527                 .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS).contains(t)))
528                 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
529     }
530
531     @Test(enabled = false, dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
532     public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
533         broker.pull(topic, UUID.randomUUID().toString());
534     }
535
536     @Test(enabled = false, expectedExceptions = NoJobException.class)
537     public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
538         Stream.of(Job.JobStatus.values())
539                 .filter(not(s -> s.equals(PENDING)))
540                 .map(s -> createMockJob("some user id", s))
541                 .map(job -> newJobAsync(broker, job))
542                 .map(this::waitForFutureJob)
543                 .collect(toList());
544
545         waitForFutureOptionalJob(pullJobAsync(broker));
546     }
547
548     @Test(enabled = false)
549     public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
550         newJobAsync(broker); // this negated the expected result of the call below
551         givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
552     }
553
554     @Test(enabled = false, expectedExceptions = NoJobException.class)
555     public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
556         putAndGetALotOfJobs(broker);
557         waitForFutureOptionalJob(pullJobAsync(broker));
558     }
559
560     @Test(enabled = false, expectedExceptions = NoJobException.class)
561     public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
562         final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
563         assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
564         waitForFutureOptionalJob(futureOptionalJob);
565     }
566
567     @Test(enabled = false, expectedExceptions = IllegalStateException.class)
568     public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
569         waitForFutureJob(newJobAsync(broker));
570         waitForFutureJob(newJobAsync(broker));
571         waitForFutureOptionalJob(pullJobAsync(broker));
572
573         Job myJob = createMockJob("user id");
574         myJob.setUuid(UUID.randomUUID());
575
576         broker.pushBack(myJob); //Should fail
577     }
578
579     @Test(enabled = false)
580     public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
581         final ImmutableMap<String, Object> randomDataForMostRecentJobType =
582                 ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
583
584         waitForFutureJob(newJobAsync(broker));
585         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
586
587         job.setStatus(Job.JobStatus.PENDING);
588         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
589         job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
590         job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
591
592         broker.pushBack(job);
593         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
594
595         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
596         assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
597         assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
598     }
599
600     private static String jobDataReflected(Job job) {
601         return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
602                 .setExcludeFieldNames("created", "modified", "takenBy")
603                 .toString();
604     }
605
606     @Test(enabled = false, expectedExceptions = IllegalStateException.class)
607     public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
608         waitForFutureJob(newJobAsync(broker));
609         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
610
611         broker.pushBack(job);
612         broker.pushBack(job); //Should fail
613     }
614
615     @Test(enabled = false)
616     public void addJob_PeekItById_verifySameJobWasPeeked() {
617         String userId = UUID.randomUUID().toString();
618         Job myJob = createMockJob(userId);
619         UUID uuid = broker.add(myJob);
620         Job peekedJob = broker.peek(uuid);
621         assertEquals("added testId is not the same as peeked TestsId",
622                 userId,
623                 peekedJob.getSharedData().getUserId());
624     }
625
626     @Test(enabled = false, dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
627        public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
628         final Job job = waitForFutureJob(newJobAsync(broker, status));
629         broker.delete(job.getUuid());
630         assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
631         waitForFutureOptionalJob(pullJobAsync(broker));
632     }
633
634     @DataProvider
635     public static Object[][] jobStatusesForSuccessDelete() {
636         return new Object[][]{
637                 {PENDING},
638                 {STOPPED}
639         };
640     }
641
642     @Test(enabled = false, 
643             dataProvider = "jobStatusesForFailedDelete",
644             expectedExceptions = OperationNotAllowedException.class,
645             expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
646     )
647     public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
648         final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
649
650         if (taken) {
651             waitForFutureOptionalJob(pullJobAsync(broker));
652         }
653
654
655         broker.delete(job.getUuid());
656     }
657
658     @DataProvider
659     public static Object[][] jobStatusesForFailedDelete() {
660         return new Object[][]{
661                 {PENDING, true},
662                 {IN_PROGRESS, false},
663                 {COMPLETED, false},
664                 {PAUSE, false},
665                 {FAILED, false},
666         };
667     }
668
669     @Test(enabled = false, expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
670     public void deleteJob_notExist_exceptionIsThrown() {
671         waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
672         broker.delete(new UUID(111, 111));
673     }
674
675 }