Update license headers
[vid.git] / vid-app-common / src / test / java / org / onap / vid / services / JobsBrokerServiceTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * VID
4  * ================================================================================
5  * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.vid.services;
22
23
24 import com.google.common.collect.ImmutableList;
25 import com.google.common.collect.ImmutableMap;
26 import org.apache.commons.lang.RandomStringUtils;
27 import org.apache.commons.lang3.RandomUtils;
28 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
29 import org.apache.commons.lang3.builder.ToStringStyle;
30 import org.apache.log4j.LogManager;
31 import org.apache.log4j.Logger;
32 import org.hibernate.SessionFactory;
33 import org.onap.portalsdk.core.domain.support.DomainVo;
34 import org.onap.portalsdk.core.service.DataAccessService;
35 import org.onap.portalsdk.core.util.SystemProperties;
36 import org.onap.vid.config.DataSourceConfig;
37 import org.onap.vid.config.JobAdapterConfig;
38 import org.onap.vid.exceptions.GenericUncheckedException;
39 import org.onap.vid.exceptions.OperationNotAllowedException;
40 import org.onap.vid.job.Job;
41 import org.onap.vid.job.JobAdapter;
42 import org.onap.vid.job.JobType;
43 import org.onap.vid.job.JobsBrokerService;
44 import org.onap.vid.job.command.JobCommandFactoryTest;
45 import org.onap.vid.job.impl.JobDaoImpl;
46 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
47 import org.onap.vid.utils.DaoUtils;
48 import org.springframework.test.context.ContextConfiguration;
49 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
50 import org.testng.Assert;
51 import org.testng.annotations.AfterMethod;
52 import org.testng.annotations.BeforeMethod;
53 import org.testng.annotations.DataProvider;
54 import org.testng.annotations.Test;
55
56 import javax.inject.Inject;
57 import java.lang.reflect.Method;
58 import java.time.LocalDateTime;
59 import java.time.ZoneId;
60 import java.util.*;
61 import java.util.concurrent.*;
62 import java.util.stream.IntStream;
63 import java.util.stream.Stream;
64
65 import static java.util.concurrent.TimeUnit.MILLISECONDS;
66 import static java.util.stream.Collectors.toList;
67 import static org.hamcrest.CoreMatchers.equalTo;
68 import static org.hamcrest.CoreMatchers.is;
69 import static org.hamcrest.MatcherAssert.assertThat;
70 import static org.hamcrest.Matchers.both;
71 import static org.hamcrest.Matchers.containsInAnyOrder;
72 import static org.onap.vid.job.Job.JobStatus.*;
73 import static org.onap.vid.utils.Streams.not;
74 import static org.testng.Assert.assertNotNull;
75 import static org.testng.AssertJUnit.assertEquals;
76
77 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
78 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
79
80     private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
81
82     private static final int JOBS_COUNT = 127;
83     private static final boolean DELETED = true;
84     private final ExecutorService executor = Executors.newFixedThreadPool(90);
85
86     private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
87
88     private final long FEW = 500;
89
90     private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
91     private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
92     private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
93     private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
94     private JobsBrokerService broker;
95
96     @Inject
97     JobAdapter jobAdapter;
98     @Inject
99     private DataAccessService dataAccessService;
100     @Inject
101     private SessionFactory sessionFactory;
102
103     /*
104     - pulling jobs is limited to inserted ones
105     - putting back allows getting the job again
106     - multi threads safety
107     - any added job should be visible with view
108
109     - edges:
110         - pulling with empty repo should return empty optional
111         - pulling more than expected should return empty optional
112         - putting one, over-pulling from a different thread
113         - take before inserting, then insert while waiting
114
115      */
116
117     private class NoJobException extends RuntimeException {
118     }
119
120     private Future<Job> newJobAsync(JobsBrokerService b) {
121         return newJobAsync(b, createMockJob("user id"));
122     }
123
124     private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
125         return newJobAsync(b, createMockJob("user id", status));
126     }
127
128     private Job createMockJob(String userId) {
129         return jobAdapter.createServiceInstantiationJob(
130                 JobType.NoOp,
131                 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
132                 UUID.randomUUID(),
133                 userId,
134                 "optimisticUniqueServiceInstanceName",
135                 RandomUtils.nextInt());
136     }
137
138     private Job createMockJob(String userId, Job.JobStatus jobStatus) {
139         Job job = createMockJob(userId);
140         job.setStatus(jobStatus);
141         return job;
142     }
143
144     private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
145         final Future<Job> jobFuture = executor.submit(() -> {
146             accountThreadId();
147
148             b.add(job);
149
150             return job;
151         });
152         return jobFuture;
153     }
154
155     private void pushBackJobAsync(JobsBrokerService b, Job job) {
156         executor.submit(() -> {
157             accountThreadId();
158             b.pushBack(job);
159             return job;
160         });
161     }
162
163     private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
164         final Future<Optional<Job>> job = executor.submit(() -> {
165             accountThreadId();
166             // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
167             return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
168         });
169         return job;
170     }
171
172     private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
173         try {
174             return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
175         } catch (TimeoutException | InterruptedException | ExecutionException e) {
176             throw new RuntimeException(e);
177         }
178     }
179
180     private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
181         try {
182             return retrievedJobFuture.get(FEW, MILLISECONDS);
183         } catch (TimeoutException | InterruptedException | ExecutionException e) {
184             throw new RuntimeException(e);
185         }
186     }
187
188     private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
189         final List<Job> originalJobs = putALotOfJobs(broker);
190         final List<Job> retrievedJobs = getAlotOfJobs(broker);
191
192         assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
193
194         return retrievedJobs;
195     }
196
197     private List<Job> putALotOfJobs(JobsBrokerService broker) {
198         int n = JOBS_COUNT;
199         return IntStream.range(0, n)
200                 .mapToObj(i -> newJobAsync(broker))
201                 .map(this::waitForFutureJob)
202                 .collect(toList());
203     }
204
205     private List<Job> getAlotOfJobs(JobsBrokerService broker) {
206         int n = JOBS_COUNT;
207         return IntStream.range(0, n)
208                 .mapToObj(i -> pullJobAsync(broker))
209                 .map(this::waitForFutureOptionalJob)
210                 .collect(toList());
211     }
212
213     private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
214         jobs.forEach(job -> pushBackJobAsync(broker, job));
215     }
216
217     private void accountThreadId() {
218         threadsIds.add(Thread.currentThread().getId());
219     }
220
221     @AfterMethod
222     public void threadsCounter() {
223         logger.info("participating threads count: " + threadsIds.size());
224         threadsIds.clear();
225     }
226
227     @BeforeMethod
228     public void initializeBroker() {
229         broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0);
230         ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
231     }
232
233     @Test
234     public void givenSingleJob_getIt_verifySameJob() {
235         final Job originalJob = waitForFutureJob(newJobAsync(broker));
236
237         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
238         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
239     }
240
241     @Test
242     public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
243         final List<Job> originalJobs = putALotOfJobs(broker);
244
245         MILLISECONDS.sleep(FEW);
246         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
247
248         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
249
250         MILLISECONDS.sleep(FEW);
251         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
252
253         pushBackJobAsync(broker, retrievedJob);
254
255         MILLISECONDS.sleep(FEW);
256         assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
257     }
258
259     @Test
260     public void givenManyJobs_getThemAll_verifySameJobs() {
261         putAndGetALotOfJobs(broker);
262     }
263
264     @Test
265     public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
266         final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
267
268         pushBackJobs(retrievedJobs1, broker);
269         final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
270
271         assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
272     }
273
274     private static Date toDate(LocalDateTime localDateTime) {
275         return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
276     }
277
278     private void setModifiedDateToJob(UUID jobUuid, Date date) {
279         DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
280         job.setModified(date);
281         DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
282             session.saveOrUpdate(job);
283             return 1;
284         });
285     }
286
287
288     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
289         return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
290     }
291
292     public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
293         JobDaoImpl job = new JobDaoImpl();
294         job.setUuid(UUID.randomUUID());
295         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
296         job.setIndexInBulk(indexInBulk);
297         job.setTemplateId(templateId);
298         job.setType(JobType.NoOp);
299         job.setStatus(status);
300         job.setTakenBy(takenBy);
301         job.setCreated(toDate(date));
302         job.setModified(toDate(date));
303         job.setUserId(userId);
304         if (deleted) {
305             job.setDeletedAt(new Date());
306         }
307         return job;
308     }
309
310     @DataProvider
311     public static Object[][] jobs(Method test) {
312         LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
313         UUID sameTemplate = UUID.randomUUID();
314         return new Object[][]{
315                 {ImmutableList.of(
316                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
317                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
318                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
319                         () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
320                         4,
321                         0,
322                         PENDING,
323                         "Broker should pull the first pending job by oldest date then by job index"
324                 },
325                 { ImmutableList.of(
326                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
327                         () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
328                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
329                         () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
330                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
331                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
332                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
333                   6,
334                   5,
335                   PENDING,
336                   "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
337                 },
338                 {ImmutableList.of(
339                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
340                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
341                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
342                         2,
343                         -1,
344                         PENDING,
345                         "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
346                 },
347                 {ImmutableList.of(
348                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
349                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
350                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
351                         2,
352                         -1,
353                         PENDING,
354                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
355                 },
356                 {ImmutableList.of(
357                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
358                         () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
359                         () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
360                         () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
361                         ),
362                         3,
363                         2,
364                         PENDING,
365                         "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
366                 },
367                 {ImmutableList.of(
368                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
369                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
370                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
371                         3,
372                         -1,
373                         PENDING,
374                         "Broker should not pull any job when there is another job from this template that was taken"
375                 },
376                 {ImmutableList.of(
377                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
378                         () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
379                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
380                         3,
381                         -1,
382                         PENDING,
383                         "Broker should not pull any job when there is another job from this template that in progress"
384                 },
385                 {ImmutableList.of(
386                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
387                         () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
388                         () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
389                         3,
390                         -1,
391                         PENDING,
392                         "Broker should not pull any job when there is another job from this template that was failed"
393                 },
394                 {ImmutableList.of(
395                         (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
396                         () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
397                         () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
398                    3,
399                    2,
400                    PENDING,
401                    "Broker should pull pending job when there is another job from this template that was deleted, although failed"
402                 },
403                 { ImmutableList.of(
404                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
405                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
406                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
407                         3,
408                         2,
409                         PENDING,
410                         "Broker should prioritize jobs of user that has no in-progress jobs"
411                 },
412                 {ImmutableList.of(
413                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
414                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
415                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
416                         3,
417                         2,
418                         PENDING,
419                         "Broker should prioritize jobs of user that has no taken jobs"
420                 },
421                 {ImmutableList.of(
422                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
423                         () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
424                         () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
425                         () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
426                         () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
427                         5,
428                         4,
429                         PENDING,
430                         "Broker should take oldest job when there is one in-progress job to each user"
431                 },
432                 {ImmutableList.of(
433                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
434                         () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
435                         () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
436                         2,
437                         -1,
438                         PENDING,
439                         "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
440                 },
441                 {ImmutableList.of(
442                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
443                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
444                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
445                         () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
446                         20,
447                         1,
448                         IN_PROGRESS,
449                         "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
450                 },
451                 {ImmutableList.of(
452                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
453                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
454                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
455                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
456                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
457                         () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
458                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
459                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
460                         () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
461                 ),
462                   20,
463                   6,
464                   IN_PROGRESS,
465                   "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
466                 },
467                 {ImmutableList.of(
468                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
469                         () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
470                         () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
471                         () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
472                         () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
473                         () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
474                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
475                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
476                         () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
477                 ),
478                         20,
479                         6,
480                         RESOURCE_IN_PROGRESS,
481                         "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
482                 },
483                 {ImmutableList.of(
484                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
485                         () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
486                         () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
487                         20,
488                         -1,
489                         IN_PROGRESS,
490                         "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
491                 },
492                 {ImmutableList.of(
493                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
494                         () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
495                         () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
496                         20,
497                         -1,
498                         RESOURCE_IN_PROGRESS,
499                         "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
500                 },
501                 {ImmutableList.of(
502                         (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
503                         () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
504                         () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
505                         1,
506                         2,
507                         CREATING,
508                         "Broker with creating topic should pull oldest creating job and ignore mso limit"
509                 },
510                 {ImmutableList.of(
511                         (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
512                         1,
513                         0,
514                         CREATING,
515                         "Broker with CREATING topic should pull CREATING job that was just modified"
516                 }
517
518         };
519     }
520
521     public interface Jobber {
522         // Will defer LocalDateTime.now() to test's "real-time"
523         JobDaoImpl toJob();
524     }
525
526     @Test(dataProvider = "jobs")
527     public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
528         JobsBrokerServiceInDatabaseImpl broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20);
529         final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
530         for (JobDaoImpl job : jobs) {
531             Date modifiedDate = job.getModified();
532             broker.add(job);
533             setModifiedDateToJob(job.getUuid(), modifiedDate);
534         }
535         Optional<Job> nextJob = broker.pull(topic, UUID.randomUUID().toString());
536         boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
537         String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
538         Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
539         if (shouldAnyBeSelected) {
540             Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
541         }
542     }
543
544     @DataProvider
545     public Object[][] topics() {
546         return Arrays.stream(Job.JobStatus.values())
547                 .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS).contains(t)))
548                 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
549     }
550
551     @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
552     public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
553         broker.pull(topic, UUID.randomUUID().toString());
554     }
555
556     @Test(expectedExceptions = NoJobException.class)
557     public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
558         Stream.of(Job.JobStatus.values())
559                 .filter(not(s -> s.equals(PENDING)))
560                 .map(s -> createMockJob("some user id", s))
561                 .map(job -> newJobAsync(broker, job))
562                 .map(this::waitForFutureJob)
563                 .collect(toList());
564
565         waitForFutureOptionalJob(pullJobAsync(broker));
566     }
567
568     @Test
569     public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
570         newJobAsync(broker); // this negated the expected result of the call below
571         givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
572     }
573
574     @Test(expectedExceptions = NoJobException.class)
575     public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
576         putAndGetALotOfJobs(broker);
577         waitForFutureOptionalJob(pullJobAsync(broker));
578     }
579
580     @Test(expectedExceptions = NoJobException.class)
581     public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
582         final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
583         assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
584         waitForFutureOptionalJob(futureOptionalJob);
585     }
586
587     @Test(expectedExceptions = IllegalStateException.class)
588     public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
589         waitForFutureJob(newJobAsync(broker));
590         waitForFutureJob(newJobAsync(broker));
591         waitForFutureOptionalJob(pullJobAsync(broker));
592
593         Job myJob = createMockJob("user id");
594         myJob.setUuid(UUID.randomUUID());
595
596         broker.pushBack(myJob); //Should fail
597     }
598
599     @Test
600     public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
601         final ImmutableMap<String, Object> randomDataForMostRecentJobType =
602                 ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
603
604         waitForFutureJob(newJobAsync(broker));
605         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
606
607         job.setStatus(Job.JobStatus.PENDING);
608         job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
609         job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
610         job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
611
612         broker.pushBack(job);
613         final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
614
615         assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
616         assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
617         assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
618     }
619
620     private static String jobDataReflected(Job job) {
621         return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
622                 .setExcludeFieldNames("created", "modified", "takenBy")
623                 .toString();
624     }
625
626     @Test(expectedExceptions = IllegalStateException.class)
627     public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
628         waitForFutureJob(newJobAsync(broker));
629         final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
630
631         broker.pushBack(job);
632         broker.pushBack(job); //Should fail
633     }
634
635     @Test
636     public void addJob_PeekItById_verifySameJobWasPeeked() {
637         String userId = UUID.randomUUID().toString();
638         Job myJob = createMockJob(userId);
639         UUID uuid = broker.add(myJob);
640         Job peekedJob = broker.peek(uuid);
641         assertEquals("added testId is not the same as peeked TestsId",
642                 userId,
643                 peekedJob.getSharedData().getUserId());
644     }
645
646     @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
647        public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
648         final Job job = waitForFutureJob(newJobAsync(broker, status));
649         broker.delete(job.getUuid());
650         assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
651         waitForFutureOptionalJob(pullJobAsync(broker));
652     }
653
654     @DataProvider
655     public static Object[][] jobStatusesForSuccessDelete() {
656         return new Object[][]{
657                 {PENDING},
658                 {STOPPED}
659         };
660     }
661
662     @Test(
663             dataProvider = "jobStatusesForFailedDelete",
664             expectedExceptions = OperationNotAllowedException.class,
665             expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
666     )
667     public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
668         final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
669
670         if (taken) {
671             waitForFutureOptionalJob(pullJobAsync(broker));
672         }
673
674
675         broker.delete(job.getUuid());
676     }
677
678     @DataProvider
679     public static Object[][] jobStatusesForFailedDelete() {
680         return new Object[][]{
681                 {PENDING, true},
682                 {IN_PROGRESS, false},
683                 {COMPLETED, false},
684                 {PAUSE, false},
685                 {FAILED, false},
686         };
687     }
688
689     @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
690     public void deleteJob_notExist_exceptionIsThrown() {
691         waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
692         broker.delete(new UUID(111, 111));
693     }
694
695 }