Merge "VnfInPlaceFields and ScaleOut rendered dynamically"
[vid.git] / vid-app-common / src / test / java / org / onap / vid / services / JobsBrokerServiceTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.services;
22
23
24 import static java.util.concurrent.TimeUnit.MILLISECONDS;
25 import static java.util.stream.Collectors.toList;
26 import static org.hamcrest.CoreMatchers.equalTo;
27 import static org.hamcrest.CoreMatchers.is;
28 import static org.hamcrest.MatcherAssert.assertThat;
29 import static org.hamcrest.Matchers.both;
30 import static org.hamcrest.Matchers.containsInAnyOrder;
31 import static org.onap.vid.job.Job.JobStatus.COMPLETED;
32 import static org.onap.vid.job.Job.JobStatus.CREATING;
33 import static org.onap.vid.job.Job.JobStatus.FAILED;
34 import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
35 import static org.onap.vid.job.Job.JobStatus.PAUSE;
36 import static org.onap.vid.job.Job.JobStatus.PENDING;
37 import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
38 import static org.onap.vid.job.Job.JobStatus.STOPPED;
39 import static org.onap.vid.utils.Streams.not;
40 import static org.testng.Assert.assertNotNull;
41 import static org.testng.AssertJUnit.assertEquals;
42
43 import com.google.common.collect.ImmutableList;
44 import com.google.common.collect.ImmutableMap;
45 import java.lang.reflect.Method;
46 import java.time.LocalDateTime;
47 import java.time.ZoneId;
48 import java.util.Arrays;
49 import java.util.Date;
50 import java.util.List;
51 import java.util.Optional;
52 import java.util.Set;
53 import java.util.UUID;
54 import java.util.concurrent.ConcurrentSkipListSet;
55 import java.util.concurrent.ExecutionException;
56 import java.util.concurrent.ExecutorService;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.Future;
59 import java.util.concurrent.TimeoutException;
60 import java.util.stream.IntStream;
61 import java.util.stream.Stream;
62 import javax.inject.Inject;
63 import org.apache.commons.lang.RandomStringUtils;
64 import org.apache.commons.lang3.RandomUtils;
65 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
66 import org.apache.commons.lang3.builder.ToStringStyle;
67 import org.apache.log4j.LogManager;
68 import org.apache.log4j.Logger;
69 import org.hibernate.SessionFactory;
70 import org.onap.portalsdk.core.domain.support.DomainVo;
71 import org.onap.portalsdk.core.service.DataAccessService;
72 import org.onap.portalsdk.core.util.SystemProperties;
73 import org.onap.vid.config.DataSourceConfig;
74 import org.onap.vid.config.JobAdapterConfig;
75 import org.onap.vid.exceptions.GenericUncheckedException;
76 import org.onap.vid.exceptions.OperationNotAllowedException;
77 import org.onap.vid.job.Job;
78 import org.onap.vid.job.JobAdapter;
79 import org.onap.vid.job.JobType;
80 import org.onap.vid.job.JobsBrokerService;
81 import org.onap.vid.job.command.JobCommandFactoryTest;
82 import org.onap.vid.job.impl.JobDaoImpl;
83 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
84 import org.onap.vid.utils.DaoUtils;
85 import org.springframework.test.context.ContextConfiguration;
86 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
87 import org.testng.Assert;
88 import org.testng.annotations.AfterMethod;
89 import org.testng.annotations.BeforeMethod;
90 import org.testng.annotations.DataProvider;
91 import org.testng.annotations.Test;
92
93 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
94 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
95
96     private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
97
98     private static final int JOBS_COUNT = 127;
99     private static final boolean DELETED = true;
100     private final ExecutorService executor = Executors.newFixedThreadPool(90);
101
102     private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
103
104     private final long FEW = 1000;
105
106     private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
107     private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
108     private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
109     private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
110     private JobsBrokerService broker;
111
112     @Inject
113     JobAdapter jobAdapter;
114     @Inject
115     private DataAccessService dataAccessService;
116     @Inject
117     private SessionFactory sessionFactory;
118
119     /*
120     - pulling jobs is limited to inserted ones
121     - putting back allows getting the job again
122     - multi threads safety
123     - any added job should be visible with view
124
125     - edges:
126         - pulling with empty repo should return empty optional
127         - pulling more than expected should return empty optional
128         - putting one, over-pulling from a different thread
129         - take before inserting, then insert while waiting
130
131      */
132
133     private class NoJobException extends RuntimeException {
134     }
135
136     private Future<Job> newJobAsync(JobsBrokerService b) {
137         return newJobAsync(b, createMockJob("user id"));
138     }
139
140     private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
141         return newJobAsync(b, createMockJob("user id", status));
142     }
143
144     private Job createMockJob(String userId) {
145         return jobAdapter.createServiceInstantiationJob(
146                 JobType.NoOp,
147                 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
148                 UUID.randomUUID(),
149                 userId,
150                 "optimisticUniqueServiceInstanceName",
151                 RandomUtils.nextInt());
152     }
153
154     private Job createMockJob(String userId, Job.JobStatus jobStatus) {
155         Job job = createMockJob(userId);
156         job.setStatus(jobStatus);
157         return job;
158     }
159
160     private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
161         final Future<Job> jobFuture = executor.submit(() -> {
162             accountThreadId();
163
164             b.add(job);
165
166             return job;
167         });
168         return jobFuture;
169     }
170
171     private void pushBackJobAsync(JobsBrokerService b, Job job) {
172         executor.submit(() -> {
173             accountThreadId();
174             b.pushBack(job);
175             return job;
176         });
177     }
178
179     private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
180         final Future<Optional<Job>> job = executor.submit(() -> {
181             accountThreadId();
182             // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
183             return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
184         });
185         return job;
186     }
187
188     private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
189         try {
190             return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
191         } catch (TimeoutException | InterruptedException | ExecutionException e) {
192             throw new RuntimeException(e);
193         }
194     }
195
196     private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
197         try {
198             return retrievedJobFuture.get(FEW, MILLISECONDS);
199         } catch (TimeoutException | InterruptedException | ExecutionException e) {
200             throw new RuntimeException(e);
201         }
202     }
203
204     private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
205         final List<Job> originalJobs = putALotOfJobs(broker);
206         final List<Job> retrievedJobs = getAlotOfJobs(broker);
207
208         assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
209
210         return retrievedJobs;
211     }
212
213     private List<Job> putALotOfJobs(JobsBrokerService broker) {
214         int n = JOBS_COUNT;
215         return IntStream.range(0, n)
216                 .mapToObj(i -> newJobAsync(broker))
217                 .map(this::waitForFutureJob)
218                 .collect(toList());
219     }
220
221     private List<Job> getAlotOfJobs(JobsBrokerService broker) {
222         int n = JOBS_COUNT;
223         return IntStream.range(0, n)
224                 .mapToObj(i -> pullJobAsync(broker))
225                 .map(this::waitForFutureOptionalJob)
226                 .collect(toList());
227     }
228
229     private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
230         jobs.forEach(job -> pushBackJobAsync(broker, job));
231     }
232
233     private void accountThreadId() {
234         threadsIds.add(Thread.currentThread().getId());
235     }
236
237     @AfterMethod
238     public void threadsCounter() {
239         logger.info("participating threads count: " + threadsIds.size());
240         threadsIds.clear();
241     }
242
243     @BeforeMethod
244     public void initializeBroker() {
245         broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0);
246         ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
247     }
248
249     @Test
250     public void givenSingleJob_getIt_verifySameJob() {
251         final Job originalJob = waitForFutureJob(newJobAsync(broker));
252
253         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
254         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
255     }
256
257     @Test
258     public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
259         final List<Job> originalJobs = putALotOfJobs(broker);
260
261         MILLISECONDS.sleep(FEW);
262         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
263
264         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
265
266         MILLISECONDS.sleep(FEW);
267         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
268
269         pushBackJobAsync(broker, retrievedJob);
270
271         MILLISECONDS.sleep(FEW);
272         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
273     }
274
275     @Test
276     public void givenManyJobs_getThemAll_verifySameJobs() {
277         putAndGetALotOfJobs(broker);
278     }
279
280     @Test
281     public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
282         final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
283
284         pushBackJobs(retrievedJobs1, broker);
285         final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
286
287         assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
288     }
289
290     private static Date toDate(LocalDateTime localDateTime) {
291         return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
292     }
293
294     private void setModifiedDateToJob(UUID jobUuid, Date date) {
295         DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
296         job.setModified(date);
297         DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
298             session.saveOrUpdate(job);
299             return 1;
300         });
301     }
302
303
304     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
305         return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
306     }
307
308     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
309         JobDaoImpl job = new JobDaoImpl();
310         job.setUuid(UUID.randomUUID());
311         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
312         job.setIndexInBulk(indexInBulk);
313         job.setTemplateId(templateId);
314         job.setType(JobType.NoOp);
315         job.setStatus(status);
316         job.setTakenBy(takenBy);
317         job.setCreated(toDate(date));
318         job.setModified(toDate(date));
319         job.setUserId(userId);
320         if (deleted) {
321             job.setDeletedAt(new Date());
322         }
323         return job;
324     }
325
326     @DataProvider
327     public static Object[][] jobs(Method test) {
328         LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
329         UUID sameTemplate = UUID.randomUUID();
330         return new Object[][]{
331                 {ImmutableList.of(
332                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
333                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
334                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
335                         () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
336                         4,
337                         0,
338                         PENDING,
339                         "Broker should pull the first pending job by oldest date then by job index"
340                 },
341                 { ImmutableList.of(
342                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
343                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
344                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
345                         () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
346                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
347                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
348                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
349                   6,
350                   5,
351                   PENDING,
352                   "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
353                 },
354                 {ImmutableList.of(
355                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
356                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
357                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
358                         2,
359                         -1,
360                         PENDING,
361                         "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
362                 },
363                 {ImmutableList.of(
364                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
365                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
366                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
367                         2,
368                         -1,
369                         PENDING,
370                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
371                 },
372                 {ImmutableList.of(
373                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
374                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
375                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
376                         () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
377                         ),
378                         3,
379                         2,
380                         PENDING,
381                         "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
382                 },
383                 {ImmutableList.of(
384                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
385                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
386                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
387                         3,
388                         -1,
389                         PENDING,
390                         "Broker should not pull any job when there is another job from this template that was taken"
391                 },
392                 {ImmutableList.of(
393                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
394                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
395                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
396                         3,
397                         -1,
398                         PENDING,
399                         "Broker should not pull any job when there is another job from this template that in progress"
400                 },
401                 {ImmutableList.of(
402                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
403                         () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
404                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
405                         3,
406                         -1,
407                         PENDING,
408                         "Broker should not pull any job when there is another job from this template that was failed"
409                 },
410                 {ImmutableList.of(
411                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
412                         () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
413                         () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
414                    3,
415                    2,
416                    PENDING,
417                    "Broker should pull pending job when there is another job from this template that was deleted, although failed"
418                 },
419                 { ImmutableList.of(
420                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
421                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
422                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
423                         3,
424                         2,
425                         PENDING,
426                         "Broker should prioritize jobs of user that has no in-progress jobs"
427                 },
428                 {ImmutableList.of(
429                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
430                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
431                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
432                         3,
433                         2,
434                         PENDING,
435                         "Broker should prioritize jobs of user that has no taken jobs"
436                 },
437                 {ImmutableList.of(
438                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
439                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
440                         () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
441                         () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
442                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
443                         5,
444                         4,
445                         PENDING,
446                         "Broker should take oldest job when there is one in-progress job to each user"
447                 },
448                 {ImmutableList.of(
449                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
450                         () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
451                         () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
452                         2,
453                         -1,
454                         PENDING,
455                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
456                 },
457                 {ImmutableList.of(
458                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
459                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
460                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
461                         () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
462                         20,
463                         1,
464                         IN_PROGRESS,
465                         "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
466                 },
467                 {ImmutableList.of(
468                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
469                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
470                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
471                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
472                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
473                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
474                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
475                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
476                         () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
477                 ),
478                   20,
479                   6,
480                   IN_PROGRESS,
481                   "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
482                 },
483                 {ImmutableList.of(
484                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
485                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
486                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
487                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
488                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
489                         () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
490                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
491                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
492                         () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
493                 ),
494                         20,
495                         6,
496                         RESOURCE_IN_PROGRESS,
497                         "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
498                 },
499                 {ImmutableList.of(
500                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
501                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
502                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
503                         20,
504                         -1,
505                         IN_PROGRESS,
506                         "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
507                 },
508                 {ImmutableList.of(
509                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
510                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
511                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
512                         20,
513                         -1,
514                         RESOURCE_IN_PROGRESS,
515                         "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
516                 },
517                 {ImmutableList.of(
518                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
519                         () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
520                         () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
521                         1,
522                         2,
523                         CREATING,
524                         "Broker with creating topic should pull oldest creating job and ignore mso limit"
525                 },
526                 {ImmutableList.of(
527                         (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
528                         1,
529                         0,
530                         CREATING,
531                         "Broker with CREATING topic should pull CREATING job that was just modified"
532                 }
533
534         };
535     }
536
537     public interface Jobber {
538         // Will defer LocalDateTime.now() to test's "real-time"
539         JobDaoImpl toJob();
540     }
541
542     @Test(dataProvider = "jobs")
543     public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
544         JobsBrokerServiceInDatabaseImpl broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20);
545         final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
546         for (JobDaoImpl job : jobs) {
547             Date modifiedDate = job.getModified();
548             broker.add(job);
549             setModifiedDateToJob(job.getUuid(), modifiedDate);
550         }
551         Optional<Job> nextJob = broker.pull(topic, UUID.randomUUID().toString());
552         boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
553         String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
554         Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
555         if (shouldAnyBeSelected) {
556             Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
557         }
558     }
559
560     @DataProvider
561     public Object[][] topics() {
562         return Arrays.stream(Job.JobStatus.values())
563                 .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS).contains(t)))
564                 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
565     }
566
567     @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
568     public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
569         broker.pull(topic, UUID.randomUUID().toString());
570     }
571
572     @Test(expectedExceptions = NoJobException.class)
573     public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
574         Stream.of(Job.JobStatus.values())
575                 .filter(not(s -> s.equals(PENDING)))
576                 .map(s -> createMockJob("some user id", s))
577                 .map(job -> newJobAsync(broker, job))
578                 .map(this::waitForFutureJob)
579                 .collect(toList());
580
581         waitForFutureOptionalJob(pullJobAsync(broker));
582     }
583
584     @Test
585     public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
586         newJobAsync(broker); // this negated the expected result of the call below
587         givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
588     }
589
590     @Test(expectedExceptions = NoJobException.class)
591     public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
592         putAndGetALotOfJobs(broker);
593         waitForFutureOptionalJob(pullJobAsync(broker));
594     }
595
596     @Test(expectedExceptions = NoJobException.class)
597     public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
598         final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
599         assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
600         waitForFutureOptionalJob(futureOptionalJob);
601     }
602
603     @Test(expectedExceptions = IllegalStateException.class)
604     public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
605         waitForFutureJob(newJobAsync(broker));
606         waitForFutureJob(newJobAsync(broker));
607         waitForFutureOptionalJob(pullJobAsync(broker));
608
609         Job myJob = createMockJob("user id");
610         myJob.setUuid(UUID.randomUUID());
611
612         broker.pushBack(myJob); //Should fail
613     }
614
615     @Test
616     public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
617         final ImmutableMap<String, Object> randomDataForMostRecentJobType =
618                 ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
619
620         waitForFutureJob(newJobAsync(broker));
621         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
622
623         job.setStatus(Job.JobStatus.PENDING);
624         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
625         job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
626         job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
627
628         broker.pushBack(job);
629         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
630
631         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
632         assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
633         assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
634     }
635
636     private static String jobDataReflected(Job job) {
637         return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
638                 .setExcludeFieldNames("created", "modified", "takenBy")
639                 .toString();
640     }
641
642     @Test(expectedExceptions = IllegalStateException.class)
643     public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
644         waitForFutureJob(newJobAsync(broker));
645         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
646
647         broker.pushBack(job);
648         broker.pushBack(job); //Should fail
649     }
650
651     @Test
652     public void addJob_PeekItById_verifySameJobWasPeeked() {
653         String userId = UUID.randomUUID().toString();
654         Job myJob = createMockJob(userId);
655         UUID uuid = broker.add(myJob);
656         Job peekedJob = broker.peek(uuid);
657         assertEquals("added testId is not the same as peeked TestsId",
658                 userId,
659                 peekedJob.getSharedData().getUserId());
660     }
661
662     @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
663        public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
664         final Job job = waitForFutureJob(newJobAsync(broker, status));
665         broker.delete(job.getUuid());
666         assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
667         waitForFutureOptionalJob(pullJobAsync(broker));
668     }
669
670     @DataProvider
671     public static Object[][] jobStatusesForSuccessDelete() {
672         return new Object[][]{
673                 {PENDING},
674                 {STOPPED}
675         };
676     }
677
678     @Test(
679             dataProvider = "jobStatusesForFailedDelete",
680             expectedExceptions = OperationNotAllowedException.class,
681             expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
682     )
683     public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
684         final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
685
686         if (taken) {
687             waitForFutureOptionalJob(pullJobAsync(broker));
688         }
689
690
691         broker.delete(job.getUuid());
692     }
693
694     @DataProvider
695     public static Object[][] jobStatusesForFailedDelete() {
696         return new Object[][]{
697                 {PENDING, true},
698                 {IN_PROGRESS, false},
699                 {COMPLETED, false},
700                 {PAUSE, false},
701                 {FAILED, false},
702         };
703     }
704
705     @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
706     public void deleteJob_notExist_exceptionIsThrown() {
707         waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
708         broker.delete(new UUID(111, 111));
709     }
710
711 }