2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.vid.services;
24 import static java.util.concurrent.TimeUnit.MILLISECONDS;
25 import static java.util.stream.Collectors.toList;
26 import static org.hamcrest.CoreMatchers.equalTo;
27 import static org.hamcrest.CoreMatchers.is;
28 import static org.hamcrest.MatcherAssert.assertThat;
29 import static org.hamcrest.Matchers.both;
30 import static org.hamcrest.Matchers.containsInAnyOrder;
31 import static org.mockito.Mockito.when;
32 import static org.onap.vid.job.Job.JobStatus.COMPLETED;
33 import static org.onap.vid.job.Job.JobStatus.COMPLETED_WITH_ERRORS;
34 import static org.onap.vid.job.Job.JobStatus.COMPLETED_WITH_NO_ACTION;
35 import static org.onap.vid.job.Job.JobStatus.CREATING;
36 import static org.onap.vid.job.Job.JobStatus.FAILED;
37 import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
38 import static org.onap.vid.job.Job.JobStatus.PAUSE;
39 import static org.onap.vid.job.Job.JobStatus.PENDING;
40 import static org.onap.vid.job.Job.JobStatus.PENDING_RESOURCE;
41 import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
42 import static org.onap.vid.job.Job.JobStatus.STOPPED;
43 import static org.onap.vid.testUtils.TestUtils.generateRandomAlphaNumeric;
44 import static org.onap.vid.utils.Streams.not;
45 import static org.testng.Assert.assertNotNull;
46 import static org.testng.AssertJUnit.assertEquals;
47 import static org.testng.AssertJUnit.assertFalse;
49 import com.google.common.collect.ImmutableList;
50 import com.google.common.collect.ImmutableMap;
51 import java.lang.reflect.Method;
52 import java.time.LocalDateTime;
53 import java.time.ZoneId;
54 import java.util.Arrays;
55 import java.util.Date;
56 import java.util.List;
57 import java.util.Optional;
59 import java.util.UUID;
60 import java.util.concurrent.ConcurrentSkipListSet;
61 import java.util.concurrent.ExecutionException;
62 import java.util.concurrent.ExecutorService;
63 import java.util.concurrent.Executors;
64 import java.util.concurrent.Future;
65 import java.util.concurrent.TimeoutException;
66 import java.util.stream.Collectors;
67 import java.util.stream.IntStream;
68 import java.util.stream.Stream;
69 import javax.inject.Inject;
70 import org.apache.commons.lang.RandomStringUtils;
71 import org.apache.commons.lang3.RandomUtils;
72 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
73 import org.apache.commons.lang3.builder.ToStringStyle;
74 import org.apache.commons.lang3.tuple.Pair;
75 import org.apache.log4j.LogManager;
76 import org.apache.log4j.Logger;
77 import org.hibernate.SessionFactory;
78 import org.jetbrains.annotations.NotNull;
79 import org.mockito.Mock;
80 import org.mockito.MockitoAnnotations;
81 import org.onap.portalsdk.core.domain.support.DomainVo;
82 import org.onap.portalsdk.core.service.DataAccessService;
83 import org.onap.portalsdk.core.util.SystemProperties;
84 import org.onap.vid.config.DataSourceConfig;
85 import org.onap.vid.config.JobAdapterConfig;
86 import org.onap.vid.exceptions.GenericUncheckedException;
87 import org.onap.vid.exceptions.OperationNotAllowedException;
88 import org.onap.vid.job.Job;
89 import org.onap.vid.job.JobAdapter;
90 import org.onap.vid.job.JobAdapter.AsyncJobRequest;
91 import org.onap.vid.job.JobType;
92 import org.onap.vid.job.JobsBrokerService;
93 import org.onap.vid.job.command.JobCommandFactoryTest;
94 import org.onap.vid.job.impl.JobDaoImpl;
95 import org.onap.vid.job.impl.JobSchedulerInitializer;
96 import org.onap.vid.job.impl.JobSharedData;
97 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
98 import org.onap.vid.utils.DaoUtils;
99 import org.springframework.test.context.ContextConfiguration;
100 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
101 import org.testng.Assert;
102 import org.testng.annotations.AfterMethod;
103 import org.testng.annotations.BeforeMethod;
104 import org.testng.annotations.DataProvider;
105 import org.testng.annotations.Test;
106 import org.togglz.core.manager.FeatureManager;
108 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
109 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
111 private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
113 private static final int JOBS_COUNT = 127;
114 private static final boolean DELETED = true;
115 private final ExecutorService executor = Executors.newFixedThreadPool(90);
117 private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
119 private final long FEW = 1000;
120 private final long SOME = 2000;
122 private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
123 private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
124 private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
125 private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
126 private JobsBrokerService broker;
129 private FeatureManager featureManager;
132 JobAdapter jobAdapter;
134 private DataAccessService dataAccessService;
136 private SessionFactory sessionFactory;
139 private VersionService versionService;
142 public void threadsCounter() {
143 logger.info("participating threads count: " + threadsIds.size());
148 public void initializeBroker() {
149 MockitoAnnotations.initMocks(this);
150 when(versionService.retrieveBuildNumber()).thenReturn("aBuildNumber");
151 broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0, versionService, featureManager);
152 ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
156 - pulling jobs is limited to inserted ones
157 - putting back allows getting the job again
158 - multi threads safety
159 - any added job should be visible with view
162 - pulling with empty repo should return empty optional
163 - pulling more than expected should return empty optional
164 - putting one, over-pulling from a different thread
165 - take before inserting, then insert while waiting
169 private class NoJobException extends RuntimeException {
172 private Future<Job> newJobAsync(JobsBrokerService b) {
173 return newJobAsync(b, createMockJob("user id"));
176 private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
177 return newJobAsync(b, createMockJob("user id", status));
180 private Job createMockJob(String userId) {
181 return jobAdapter.createServiceInstantiationJob(
183 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
187 "optimisticUniqueServiceInstanceName",
188 RandomUtils.nextInt());
191 private Job createMockJob(String userId, Job.JobStatus jobStatus) {
192 Job job = createMockJob(userId);
193 job.setStatus(jobStatus);
197 private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
198 final Future<Job> jobFuture = executor.submit(() -> {
208 private void pushBackJobAsync(JobsBrokerService b, Job job) {
209 executor.submit(() -> {
216 private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
217 final Future<Optional<Job>> job = executor.submit(() -> {
219 // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
220 return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
225 private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
227 return retrievedOptionalJobFuture.get(SOME, MILLISECONDS).orElseThrow(NoJobException::new);
228 } catch (TimeoutException | InterruptedException | ExecutionException e) {
229 throw new RuntimeException(e);
233 private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
235 return retrievedJobFuture.get(SOME, MILLISECONDS);
236 } catch (TimeoutException | InterruptedException | ExecutionException e) {
237 throw new RuntimeException(e);
241 private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
242 final List<Job> originalJobs = putALotOfJobs(broker);
243 final List<Job> retrievedJobs = getAlotOfJobs(broker);
245 assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
247 return retrievedJobs;
250 private List<Job> putALotOfJobs(JobsBrokerService broker) {
252 return IntStream.range(0, n)
253 .mapToObj(i -> newJobAsync(broker))
254 .map(this::waitForFutureJob)
258 private List<Job> getAlotOfJobs(JobsBrokerService broker) {
260 return IntStream.range(0, n)
261 .mapToObj(i -> pullJobAsync(broker))
262 .map(this::waitForFutureOptionalJob)
266 private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
267 jobs.forEach(job -> pushBackJobAsync(broker, job));
270 private void accountThreadId() {
271 threadsIds.add(Thread.currentThread().getId());
275 public void givenSingleJob_getIt_verifySameJob() {
276 final Job originalJob = waitForFutureJob(newJobAsync(broker));
278 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
279 assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
283 public static Object[][] allTopics() {
284 return JobSchedulerInitializer.WORKERS_TOPICS.stream()
285 .map(topic -> new Object[] { topic })
286 .toArray(Object[][]::new);
289 @Test(dataProvider = "allTopics")
290 public void givenJobFromSameBuild_pullJobs_jobIsPulled(Job.JobStatus topic) {
291 when(versionService.retrieveBuildNumber()).thenReturn("someVersion");
292 Job mockedJob = createMockJob("user id", topic);
293 UUID uuid = broker.add(mockedJob);
294 assertEquals(uuid, broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
298 @Test(dataProvider = "allTopics")
299 public void givenJobFromOtherBuild_pullJobs_noneIsPulled(Job.JobStatus topic) {
300 when(versionService.retrieveBuildNumber()).thenReturn("old");
301 Job mockedJob = createMockJob("user id", topic);
302 broker.add(mockedJob);
303 when(versionService.retrieveBuildNumber()).thenReturn("new");
304 assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
308 public void givenJobWithNullBuildAndJobWithRealBuild_pullJobs_jobsWithNonNullIsPulled() {
309 Job.JobStatus topic = PENDING;
311 //push job with null build
312 when(versionService.retrieveBuildNumber()).thenReturn(null);
313 broker.add(createMockJob("user id", topic));
315 //push job with "aBuild" build
316 when(versionService.retrieveBuildNumber()).thenReturn("aBuild");
317 UUID newJobId = broker.add(createMockJob("user id", topic));
319 //pull jobs while current build is still "aBuild". Only the non null build is pulled
320 assertEquals(newJobId, broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
322 //no more jobs to pull
323 assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
328 public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
329 final List<Job> originalJobs = putALotOfJobs(broker);
331 MILLISECONDS.sleep(FEW);
332 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
334 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
336 MILLISECONDS.sleep(FEW);
337 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
339 pushBackJobAsync(broker, retrievedJob);
341 MILLISECONDS.sleep(FEW);
342 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
346 public void givenManyJobs_getThemAll_verifySameJobs() {
347 putAndGetALotOfJobs(broker);
351 public void givenManyJobs_getThemAllThenPushBackAndGet_verifySameJobs() {
352 final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
354 pushBackJobs(retrievedJobs1, broker);
355 final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
357 assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
360 private static Date toDate(LocalDateTime localDateTime) {
361 return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
364 private void setModifiedDateToJob(UUID jobUuid, Date date) {
365 DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
366 job.setModified(date);
367 DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
368 session.saveOrUpdate(job);
374 public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
375 return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
378 public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
379 JobDaoImpl job = new JobDaoImpl();
380 job.setUuid(UUID.randomUUID());
381 job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
382 job.setIndexInBulk(indexInBulk);
383 job.setTemplateId(templateId);
384 job.setType(JobType.NoOp);
385 job.setStatus(status);
386 job.setTakenBy(takenBy);
387 job.setCreated(toDate(date));
388 job.setModified(toDate(date));
389 job.setUserId(userId);
391 job.setDeletedAt(new Date());
397 public static Object[][] jobs(Method test) {
398 LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
399 UUID sameTemplate = UUID.randomUUID();
400 return new Object[][]{
402 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
403 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
404 () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
405 () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
409 "Broker should pull the first pending job by oldest date then by job index"
412 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
413 () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
414 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
415 () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
416 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
417 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
418 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
422 "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
425 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
426 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
427 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
431 "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
434 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
435 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
436 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
440 "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
443 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
444 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
445 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
446 () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
451 "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
454 (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
455 () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
456 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
460 "Broker should not pull any job when there is another job from this template that was taken"
463 (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
464 () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
465 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
469 "Broker should not pull any job when there is another job from this template that in progress"
472 (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
473 () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
474 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
478 "Broker should not pull any job when there is another job from this template that was failed"
481 (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
482 () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
483 () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
487 "Broker should pull pending job when there is another job from this template that was deleted, although failed"
490 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
491 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
492 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
496 "Broker should prioritize jobs of user that has no in-progress jobs"
499 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
500 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
501 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
505 "Broker should prioritize jobs of user that has no taken jobs"
508 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
509 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
510 () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
511 () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
512 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
516 "Broker should take oldest job when there is one in-progress job to each user"
519 (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
520 () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
521 () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
525 "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
528 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
529 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
530 () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
531 () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
535 "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
538 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
539 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
540 () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
541 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
542 () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
543 () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
544 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
545 () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
546 () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
551 "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
554 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
555 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
556 () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
557 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
558 () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
559 () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
560 () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
561 () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
562 () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
566 RESOURCE_IN_PROGRESS,
567 "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
570 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
571 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
572 () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
576 "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
579 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
580 () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
581 () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
584 RESOURCE_IN_PROGRESS,
585 "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
588 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
589 () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
590 () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
594 "Broker with creating topic should pull oldest creating job and ignore mso limit"
597 (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
601 "Broker with CREATING topic should pull CREATING job that was just modified"
607 public interface Jobber {
608 // Will defer LocalDateTime.now() to test's "real-time"
612 @Test(dataProvider = "jobs")
613 public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
614 JobsBrokerServiceInDatabaseImpl aBroker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20, versionService, featureManager);
615 final List<JobDaoImpl> jobs = addJobsWithModifiedDate(jobbers, aBroker);
616 Optional<Job> nextJob = aBroker.pull(topic, UUID.randomUUID().toString());
617 boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
618 String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
619 Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
620 if (shouldAnyBeSelected) {
621 Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
626 protected List<JobDaoImpl> addJobsWithModifiedDate(List<Jobber> jobbers, JobsBrokerService broker) {
627 final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
628 return addJobsWithModifiedDateByJobDao(jobs, broker);
632 private List<JobDaoImpl> addJobsWithModifiedDateByJobDao(List<JobDaoImpl> jobs, JobsBrokerService broker) {
633 for (JobDaoImpl job : jobs) {
634 Date modifiedDate = job.getModified();
636 setModifiedDateToJob(job.getUuid(), modifiedDate);
642 public static Object[][] jobsForTestingPendingResource(Method test) {
643 UUID templateId1 = UUID.fromString("311a9196-bbc5-47a1-8b11-bf0f9db1c7ca");
644 UUID templateId2 = UUID.fromString("4f1522f9-642e-49f7-af75-a2f344085bcc");
645 return new Object[][]{
646 {ImmutableList.of( (Jobber)
647 () -> createNewJob(12, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
648 () -> createNewJob(1, templateId2, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
649 () -> createNewJob(2, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
650 () -> createNewJob(3, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
651 () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false)
654 "given there is only one in the queue in PENDING_RESOURCE and no other job with same templateId, then this job is selected"
656 {ImmutableList.of( (Jobber)
657 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
658 () -> createNewJob(3, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(2), false),
659 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
662 "given multiple jobs with same templateId in PENDING_RESOURCE, then job with lowest indexInBulk is selected"
664 {ImmutableList.of( (Jobber)
665 () -> createNewJob(1, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
666 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
669 "given multiple jobs with same indexInBulk, then job with lowest templateId is selected"
671 {ImmutableList.of( (Jobber)
672 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
673 () -> createNewJob(2, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false)
676 "given multiple jobs with different indexInBulk and different templateId, then job with lowest indexInBulk is selected"
678 {ImmutableList.of( (Jobber)
679 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
680 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false)
683 "given there is already taken job with same templateId, then no job is selected"
685 {ImmutableList.of( (Jobber)
686 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
687 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false),
688 () -> createNewJob(9, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
689 () -> createNewJob(8, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
692 "given 4 jobs, 2 jobs templateId1 but one of them is taken, and 2 jobs with templateId2, then select job with templateId2"
694 {ImmutableList.of( (Jobber)
695 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
696 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), true)
699 "given 2 jobs with same templateId, one of them is taken but deleted, then the other job is selected"
701 {ImmutableList.of( (Jobber)
702 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
703 () -> createNewJob(1, templateId1, "userId", IN_PROGRESS, null, LocalDateTime.now(), false)
706 "given 2 jobs with same templateId, one of them is IN_PROGRESS, then no job is selected"
708 {ImmutableList.of( (Jobber)
709 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
710 () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false)
713 "given 2 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS, then no job is selected"
715 {ImmutableList.of( (Jobber)
716 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
717 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
718 () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), true)
721 "given 3 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS but deleted, then other job with lowest indexInBulk is selected"
723 {ImmutableList.of( (Jobber)
724 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
725 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
726 () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false),
727 () -> createNewJob(12, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
728 () -> createNewJob(11, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
731 "given 5 jobs, 3 with templateId1 that one of them is RESOURCE_IN_PROGRESS,"+
732 "2 with templateId2 both in PENDING_RESOURCE, then job with lowest indexInBulk from templateId2 is selected"
735 {ImmutableList.of( (Jobber)
736 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), true)
739 "given 1 job in PENDING_RESOURCE but it's deleted, then no job is selected"
741 {ImmutableList.of( (Jobber)
742 () -> createNewJob(20, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
743 () -> createNewJob(1, templateId1, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
744 () -> createNewJob(2, templateId1, "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
745 () -> createNewJob(3, templateId1, "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
746 () -> createNewJob(4, templateId1, "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
747 () -> createNewJob(5, templateId1, "userId", STOPPED, null, LocalDateTime.now().minusSeconds(1), false),
748 () -> createNewJob(6, templateId1, "userId", PAUSE, null, LocalDateTime.now().minusSeconds(1), false)
751 "given multiple jobs with same templateId, 1 in PENDING_RESOURCE, and other are not in progress, "+
752 "then the job in PENDING_RESOURCE is selected"
754 {ImmutableList.of( (Jobber)
755 () -> createNewJob(1, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
756 () -> createNewJob(2, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
757 () -> createNewJob(3, UUID.randomUUID(), "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
758 () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
759 () -> createNewJob(5, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
760 () -> createNewJob(6, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false)
763 "given there is no job in PENDING_RESOURCE state, then no job is selected"
765 {ImmutableList.of( (Jobber)
766 () -> createNewJob(6, null, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false)
769 "given there is 1 job in PENDING_RESOURCE but without templateId, then no job is selected"
774 @Test(dataProvider = "jobsForTestingPendingResource")
775 public void givenSomeJobs_pullPendingResource_returnNextOrNothingAsExpected(List<Jobber> jobbers, int expectedIndexSelected, String assertionReason) {
776 givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(jobbers, 1, expectedIndexSelected, PENDING_RESOURCE, assertionReason);
779 public static JobDaoImpl createNewJob(Job.JobStatus status, String takenBy, long secondsOffset, boolean deleted) {
780 return createNewJob(1, UUID.randomUUID(), "af456", status, takenBy, LocalDateTime.now().minusSeconds(secondsOffset), deleted);
784 public void givenSomeJobs_deleteOldFinalJobs_onlyExpectedJobsAreDeleted() {
786 final List<Pair<JobDaoImpl,Boolean>> jobs = ImmutableList.of(
788 Pair.of(createNewJob(IN_PROGRESS, null, seconds+1, false), true),
789 Pair.of(createNewJob(RESOURCE_IN_PROGRESS, null, seconds+1, false), true),
790 Pair.of(createNewJob(PENDING, null, seconds+1, false), true),
791 Pair.of(createNewJob(CREATING, null, seconds+1, false), true),
792 Pair.of(createNewJob(PENDING_RESOURCE, null, seconds+1, false), true),
793 Pair.of(createNewJob(PAUSE, null, seconds+1, false), true),
796 Pair.of(createNewJob(COMPLETED, null, seconds+1, false), false),
797 Pair.of(createNewJob(FAILED, null, seconds+1, false), false),
798 Pair.of(createNewJob(STOPPED, null, seconds+1, false), false),
799 Pair.of(createNewJob(COMPLETED_WITH_ERRORS, null, seconds+1, true), false),
800 Pair.of(createNewJob(COMPLETED_WITH_NO_ACTION, generateRandomAlphaNumeric(5), seconds+1, true), false),
803 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-2, false), true),
804 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-400, false), true),
805 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), 0, false), true)
807 addJobsWithModifiedDateByJobDao(jobs.stream().map(Pair::getLeft).collect(Collectors.toList()), broker);
808 assertEquals(jobs.size(), broker.peek().size());
810 broker.deleteOldFinalJobs(seconds);
811 Stream<Pair<UUID, Job.JobStatus>> expectedJobs = jobs.stream()
812 .filter(Pair::getRight)
814 x.getLeft().getUuid(),
815 x.getLeft().getStatus()
817 assertThat(broker.peek().stream().map(x->Pair.of(x.getUuid(), x.getStatus())).collect(Collectors.toList()),
818 containsInAnyOrder(expectedJobs.toArray()));
822 public Object[][] topics() {
823 return Arrays.stream(Job.JobStatus.values())
824 .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS, PENDING_RESOURCE).contains(t)))
825 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
828 @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
829 public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
830 broker.pull(topic, UUID.randomUUID().toString());
833 @Test(expectedExceptions = NoJobException.class)
834 public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
835 Stream.of(Job.JobStatus.values())
836 .filter(not(s -> s.equals(PENDING)))
837 .map(s -> createMockJob("some user id", s))
838 .map(job -> newJobAsync(broker, job))
839 .map(this::waitForFutureJob)
842 waitForFutureOptionalJob(pullJobAsync(broker));
846 public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
847 newJobAsync(broker); // this negated the expected result of the call below
848 givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
851 @Test(expectedExceptions = NoJobException.class)
852 public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
853 putAndGetALotOfJobs(broker);
854 waitForFutureOptionalJob(pullJobAsync(broker));
857 @Test(expectedExceptions = NoJobException.class)
858 public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
859 final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
860 assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
861 waitForFutureOptionalJob(futureOptionalJob);
864 @Test(expectedExceptions = IllegalStateException.class)
865 public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
866 waitForFutureJob(newJobAsync(broker));
867 waitForFutureJob(newJobAsync(broker));
868 waitForFutureOptionalJob(pullJobAsync(broker));
870 Job myJob = createMockJob("user id");
871 myJob.setUuid(UUID.randomUUID());
873 broker.pushBack(myJob); //Should fail
877 public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
878 final ImmutableMap<String, Object> randomDataForMostRecentJobType =
879 ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
881 waitForFutureJob(newJobAsync(broker));
882 final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
884 job.setStatus(Job.JobStatus.PENDING);
885 job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
886 job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
887 job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
889 broker.pushBack(job);
890 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
892 assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
893 assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
894 assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
897 private static String jobDataReflected(Job job) {
898 return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
899 .setExcludeFieldNames("created", "modified", "takenBy")
903 @Test(expectedExceptions = IllegalStateException.class)
904 public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
905 waitForFutureJob(newJobAsync(broker));
906 final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
908 broker.pushBack(job);
909 broker.pushBack(job); //Should fail
913 public void addJob_PeekItById_verifySameJobWasPeeked() {
914 String userId = UUID.randomUUID().toString();
915 Job myJob = createMockJob(userId);
916 UUID uuid = broker.add(myJob);
917 Job peekedJob = broker.peek(uuid);
918 assertEquals("added testId is not the same as peeked TestsId",
920 peekedJob.getSharedData().getUserId());
923 @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
924 public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
925 final Job job = waitForFutureJob(newJobAsync(broker, status));
926 broker.delete(job.getUuid());
927 assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
928 waitForFutureOptionalJob(pullJobAsync(broker));
932 public static Object[][] jobStatusesForSuccessDelete() {
933 return new Object[][]{
940 dataProvider = "jobStatusesForFailedDelete",
941 expectedExceptions = OperationNotAllowedException.class,
942 expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
944 public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
945 final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
948 waitForFutureOptionalJob(pullJobAsync(broker));
952 broker.delete(job.getUuid());
956 public static Object[][] jobStatusesForFailedDelete() {
957 return new Object[][]{
959 {IN_PROGRESS, false},
966 @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
967 public void deleteJob_notExist_exceptionIsThrown() {
968 waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
969 broker.delete(new UUID(111, 111));
972 public static class MockAsyncRequest implements AsyncJobRequest {
975 public MockAsyncRequest() {}
977 public MockAsyncRequest(String value) {
981 public String getValue() {
987 public void twoJobsWithSamePosition_bothJobsArePulled(){
988 UUID uuid = UUID.randomUUID();
989 int positionInBulk = RandomUtils.nextInt();
990 String userId = "userId";
992 Optional<Job> firstPulledJob = createAddAndPullJob(uuid, positionInBulk, userId, "first value");
993 Optional<Job> secondPulledJob = createAddAndPullJob(uuid, positionInBulk, userId, "second value");
995 MockAsyncRequest firstValue = (MockAsyncRequest) firstPulledJob.get().getSharedData().getRequest();
996 MockAsyncRequest secondValue = (MockAsyncRequest) secondPulledJob.get().getSharedData().getRequest();
997 assertThat(ImmutableList.of(firstValue.value, secondValue.value),
998 containsInAnyOrder("first value", "second value"));
1001 private Optional<Job> createAddAndPullJob(UUID uuid, int positionInBulk, String userId, String s) {
1002 JobDaoImpl job1 = createNewJob(positionInBulk, uuid, userId, CREATING, null,
1003 LocalDateTime.now().minusSeconds(1), false);
1004 job1.setSharedData(new JobSharedData(null, userId, new MockAsyncRequest(s), "testApi"));
1006 return broker.pull(CREATING, userId);