2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.vid.services;
24 import com.google.common.collect.ImmutableList;
25 import com.google.common.collect.ImmutableMap;
26 import org.apache.commons.lang.RandomStringUtils;
27 import org.apache.commons.lang3.RandomUtils;
28 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
29 import org.apache.commons.lang3.builder.ToStringStyle;
30 import org.apache.commons.lang3.tuple.Pair;
31 import org.apache.log4j.LogManager;
32 import org.apache.log4j.Logger;
33 import org.hibernate.SessionFactory;
34 import org.jetbrains.annotations.NotNull;
35 import org.mockito.Mock;
36 import org.mockito.MockitoAnnotations;
37 import org.onap.portalsdk.core.domain.support.DomainVo;
38 import org.onap.portalsdk.core.service.DataAccessService;
39 import org.onap.portalsdk.core.util.SystemProperties;
40 import org.onap.vid.exceptions.GenericUncheckedException;
41 import org.onap.vid.exceptions.OperationNotAllowedException;
42 import org.onap.vid.job.Job;
43 import org.onap.vid.job.JobAdapter;
44 import org.onap.vid.job.JobType;
45 import org.onap.vid.job.JobsBrokerService;
46 import org.onap.vid.job.command.JobCommandFactoryTest;
47 import org.onap.vid.job.impl.JobDaoImpl;
48 import org.onap.vid.job.impl.JobSchedulerInitializer;
49 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
50 import org.onap.vid.services.VersionService;
51 import org.onap.vid.utils.DaoUtils;
52 import org.onap.vid.config.DataSourceConfig;
53 import org.onap.vid.config.JobAdapterConfig;
54 import org.springframework.test.context.ContextConfiguration;
55 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
56 import org.testng.Assert;
57 import org.testng.annotations.AfterMethod;
58 import org.testng.annotations.BeforeMethod;
59 import org.testng.annotations.DataProvider;
60 import org.testng.annotations.Test;
62 import javax.inject.Inject;
63 import java.lang.reflect.Method;
64 import java.time.LocalDateTime;
65 import java.time.ZoneId;
67 import java.util.concurrent.*;
68 import java.util.stream.Collectors;
69 import java.util.stream.IntStream;
70 import java.util.stream.Stream;
72 import static java.util.concurrent.TimeUnit.MILLISECONDS;
73 import static java.util.stream.Collectors.toList;
74 import static org.hamcrest.CoreMatchers.equalTo;
75 import static org.hamcrest.CoreMatchers.is;
76 import static org.hamcrest.MatcherAssert.assertThat;
77 import static org.hamcrest.Matchers.both;
78 import static org.hamcrest.Matchers.containsInAnyOrder;
79 import static org.mockito.Mockito.when;
80 import static org.onap.vid.job.Job.JobStatus.*;
81 import static org.onap.vid.utils.Streams.not;
82 import static org.onap.vid.testUtils.TestUtils.generateRandomAlphaNumeric;
83 import static org.testng.Assert.assertNotNull;
84 import static org.testng.AssertJUnit.assertEquals;
85 import static org.testng.AssertJUnit.assertFalse;
87 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
88 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
90 private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
92 private static final int JOBS_COUNT = 127;
93 private static final boolean DELETED = true;
94 private final ExecutorService executor = Executors.newFixedThreadPool(90);
96 private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
98 private final long FEW = 500;
100 private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
101 private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
102 private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
103 private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
104 private JobsBrokerService broker;
107 JobAdapter jobAdapter;
109 private DataAccessService dataAccessService;
111 private SessionFactory sessionFactory;
114 private VersionService versionService;
117 public void threadsCounter() {
118 logger.info("participating threads count: " + threadsIds.size());
123 public void initializeBroker() {
124 MockitoAnnotations.initMocks(this);
125 when(versionService.retrieveBuildNumber()).thenReturn("aBuildNumber");
126 broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0, versionService);
127 ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
131 - pulling jobs is limited to inserted ones
132 - putting back allows getting the job again
133 - multi threads safety
134 - any added job should be visible with view
137 - pulling with empty repo should return empty optional
138 - pulling more than expected should return empty optional
139 - putting one, over-pulling from a different thread
140 - take before inserting, then insert while waiting
144 private class NoJobException extends RuntimeException {
147 private Future<Job> newJobAsync(JobsBrokerService b) {
148 return newJobAsync(b, createMockJob("user id"));
151 private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
152 return newJobAsync(b, createMockJob("user id", status));
155 private Job createMockJob(String userId) {
156 return jobAdapter.createServiceInstantiationJob(
158 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
162 "optimisticUniqueServiceInstanceName",
163 RandomUtils.nextInt());
166 private Job createMockJob(String userId, Job.JobStatus jobStatus) {
167 Job job = createMockJob(userId);
168 job.setStatus(jobStatus);
172 private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
173 final Future<Job> jobFuture = executor.submit(() -> {
183 private void pushBackJobAsync(JobsBrokerService b, Job job) {
184 executor.submit(() -> {
191 private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
192 final Future<Optional<Job>> job = executor.submit(() -> {
194 // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
195 return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
200 private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
202 return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
203 } catch (TimeoutException | InterruptedException | ExecutionException e) {
204 throw new RuntimeException(e);
208 private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
210 return retrievedJobFuture.get(FEW, MILLISECONDS);
211 } catch (TimeoutException | InterruptedException | ExecutionException e) {
212 throw new RuntimeException(e);
216 private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
217 final List<Job> originalJobs = putALotOfJobs(broker);
218 final List<Job> retrievedJobs = getAlotOfJobs(broker);
220 assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
222 return retrievedJobs;
225 private List<Job> putALotOfJobs(JobsBrokerService broker) {
227 return IntStream.range(0, n)
228 .mapToObj(i -> newJobAsync(broker))
229 .map(this::waitForFutureJob)
233 private List<Job> getAlotOfJobs(JobsBrokerService broker) {
235 return IntStream.range(0, n)
236 .mapToObj(i -> pullJobAsync(broker))
237 .map(this::waitForFutureOptionalJob)
241 private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
242 jobs.forEach(job -> pushBackJobAsync(broker, job));
245 private void accountThreadId() {
246 threadsIds.add(Thread.currentThread().getId());
250 public void givenSingleJob_getIt_verifySameJob() {
251 final Job originalJob = waitForFutureJob(newJobAsync(broker));
253 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
254 assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
258 public static Object[][] allTopics() {
259 return JobSchedulerInitializer.WORKERS_TOPICS.stream()
260 .map(topic -> new Object[] { topic })
261 .toArray(Object[][]::new);
264 @Test(dataProvider = "allTopics")
265 public void givenJobFromSameBuild_pullJobs_jobIsPulled(Job.JobStatus topic) {
266 when(versionService.retrieveBuildNumber()).thenReturn("someVersion");
267 Job mockedJob = createMockJob("user id", topic);
268 UUID uuid = broker.add(mockedJob);
269 assertEquals(uuid, broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
273 @Test(dataProvider = "allTopics")
274 public void givenJobFromOtherBuild_pullJobs_noneIsPulled(Job.JobStatus topic) {
275 when(versionService.retrieveBuildNumber()).thenReturn("old");
276 Job mockedJob = createMockJob("user id", topic);
277 broker.add(mockedJob);
278 when(versionService.retrieveBuildNumber()).thenReturn("new");
279 assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
283 public void givenJobWithNullBuildAndJobWithRealBuild_pullJobs_jobsWithNonNullIsPulled() {
284 Job.JobStatus topic = PENDING;
286 //push job with null build
287 when(versionService.retrieveBuildNumber()).thenReturn(null);
288 broker.add(createMockJob("user id", topic));
290 //push job with "aBuild" build
291 when(versionService.retrieveBuildNumber()).thenReturn("aBuild");
292 UUID newJobId = broker.add(createMockJob("user id", topic));
294 //pull jobs while current build is still "aBuild". Only the non null build is pulled
295 assertEquals(newJobId, broker.pull(topic, UUID.randomUUID().toString()).get().getUuid());
297 //no more jobs to pull
298 assertFalse(broker.pull(topic, UUID.randomUUID().toString()).isPresent());
303 public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
304 final List<Job> originalJobs = putALotOfJobs(broker);
306 MILLISECONDS.sleep(FEW);
307 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
309 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
311 MILLISECONDS.sleep(FEW);
312 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
314 pushBackJobAsync(broker, retrievedJob);
316 MILLISECONDS.sleep(FEW);
317 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
321 public void givenManyJobs_getThemAll_verifySameJobs() {
322 putAndGetALotOfJobs(broker);
326 public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
327 final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
329 pushBackJobs(retrievedJobs1, broker);
330 final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
332 assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
335 private static Date toDate(LocalDateTime localDateTime) {
336 return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
339 private void setModifiedDateToJob(UUID jobUuid, Date date) {
340 DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
341 job.setModified(date);
342 DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
343 session.saveOrUpdate(job);
349 public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
350 return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
353 public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
354 JobDaoImpl job = new JobDaoImpl();
355 job.setUuid(UUID.randomUUID());
356 job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
357 job.setIndexInBulk(indexInBulk);
358 job.setTemplateId(templateId);
359 job.setType(JobType.NoOp);
360 job.setStatus(status);
361 job.setTakenBy(takenBy);
362 job.setCreated(toDate(date));
363 job.setModified(toDate(date));
364 job.setUserId(userId);
366 job.setDeletedAt(new Date());
372 public static Object[][] jobs(Method test) {
373 LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
374 UUID sameTemplate = UUID.randomUUID();
375 return new Object[][]{
377 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
378 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
379 () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
380 () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
384 "Broker should pull the first pending job by oldest date then by job index"
387 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
388 () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
389 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
390 () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
391 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
392 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
393 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
397 "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
400 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
401 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
402 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
406 "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
409 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
410 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
411 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
415 "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
418 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
419 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
420 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
421 () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
426 "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
429 (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
430 () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
431 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
435 "Broker should not pull any job when there is another job from this template that was taken"
438 (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
439 () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
440 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
444 "Broker should not pull any job when there is another job from this template that in progress"
447 (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
448 () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
449 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
453 "Broker should not pull any job when there is another job from this template that was failed"
456 (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
457 () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
458 () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
462 "Broker should pull pending job when there is another job from this template that was deleted, although failed"
465 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
466 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
467 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
471 "Broker should prioritize jobs of user that has no in-progress jobs"
474 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
475 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
476 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
480 "Broker should prioritize jobs of user that has no taken jobs"
483 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
484 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
485 () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
486 () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
487 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
491 "Broker should take oldest job when there is one in-progress job to each user"
494 (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
495 () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
496 () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
500 "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
503 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
504 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
505 () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
506 () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
510 "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
513 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
514 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
515 () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
516 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
517 () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
518 () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
519 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
520 () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
521 () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
526 "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
529 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
530 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
531 () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
532 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
533 () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
534 () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
535 () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
536 () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
537 () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
541 RESOURCE_IN_PROGRESS,
542 "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
545 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
546 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
547 () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
551 "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
554 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
555 () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
556 () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
559 RESOURCE_IN_PROGRESS,
560 "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
563 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
564 () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
565 () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
569 "Broker with creating topic should pull oldest creating job and ignore mso limit"
572 (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
576 "Broker with CREATING topic should pull CREATING job that was just modified"
582 public interface Jobber {
583 // Will defer LocalDateTime.now() to test's "real-time"
587 @Test(dataProvider = "jobs")
588 public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
589 JobsBrokerServiceInDatabaseImpl aBroker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20, versionService);
590 final List<JobDaoImpl> jobs = addJobsWithModifiedDate(jobbers, aBroker);
591 Optional<Job> nextJob = aBroker.pull(topic, UUID.randomUUID().toString());
592 boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
593 String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
594 Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
595 if (shouldAnyBeSelected) {
596 Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
601 protected List<JobDaoImpl> addJobsWithModifiedDate(List<Jobber> jobbers, JobsBrokerService broker) {
602 final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
603 return addJobsWithModifiedDateByJobDao(jobs, broker);
607 private List<JobDaoImpl> addJobsWithModifiedDateByJobDao(List<JobDaoImpl> jobs, JobsBrokerService broker) {
608 for (JobDaoImpl job : jobs) {
609 Date modifiedDate = job.getModified();
611 setModifiedDateToJob(job.getUuid(), modifiedDate);
617 public static Object[][] jobsForTestingPendingResource(Method test) {
618 UUID templateId1 = UUID.fromString("311a9196-bbc5-47a1-8b11-bf0f9db1c7ca");
619 UUID templateId2 = UUID.fromString("4f1522f9-642e-49f7-af75-a2f344085bcc");
620 return new Object[][]{
621 {ImmutableList.of( (Jobber)
622 () -> createNewJob(12, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
623 () -> createNewJob(1, templateId2, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
624 () -> createNewJob(2, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
625 () -> createNewJob(3, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
626 () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false)
629 "given there is only one in the queue in PENDING_RESOURCE and no other job with same templateId, then this job is selected"
631 {ImmutableList.of( (Jobber)
632 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
633 () -> createNewJob(3, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(2), false),
634 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
637 "given multiple jobs with same templateId in PENDING_RESOURCE, then job with lowest indexInBulk is selected"
639 {ImmutableList.of( (Jobber)
640 () -> createNewJob(1, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
641 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
644 "given multiple jobs with same indexInBulk, then job with lowest templateId is selected"
646 {ImmutableList.of( (Jobber)
647 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
648 () -> createNewJob(2, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false)
651 "given multiple jobs with different indexInBulk and different templateId, then job with lowest indexInBulk is selected"
653 {ImmutableList.of( (Jobber)
654 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
655 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false)
658 "given there is already taken job with same templateId, then no job is selected"
660 {ImmutableList.of( (Jobber)
661 () -> createNewJob(2, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
662 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), false),
663 () -> createNewJob(9, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false),
664 () -> createNewJob(8, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
667 "given 4 jobs, 2 jobs templateId1 but one of them is taken, and 2 jobs with templateId2, then select job with templateId2"
669 {ImmutableList.of( (Jobber)
670 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
671 () -> createNewJob(1, templateId1, "userId", PENDING_RESOURCE, "123", LocalDateTime.now(), true)
674 "given 2 jobs with same templateId, one of them is taken but deleted, then the other job is selected"
676 {ImmutableList.of( (Jobber)
677 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
678 () -> createNewJob(1, templateId1, "userId", IN_PROGRESS, null, LocalDateTime.now(), false)
681 "given 2 jobs with same templateId, one of them is IN_PROGRESS, then no job is selected"
683 {ImmutableList.of( (Jobber)
684 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
685 () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false)
688 "given 2 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS, then no job is selected"
690 {ImmutableList.of( (Jobber)
691 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
692 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
693 () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), true)
696 "given 3 jobs with same templateId, one of them is RESOURCE_IN_PROGRESS but deleted, then other job with lowest indexInBulk is selected"
698 {ImmutableList.of( (Jobber)
699 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
700 () -> createNewJob(5, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(1), false),
701 () -> createNewJob(1, templateId1, "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now(), false),
702 () -> createNewJob(12, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false),
703 () -> createNewJob(11, templateId2, "userId", PENDING_RESOURCE, null, LocalDateTime.now(), false)
706 "given 5 jobs, 3 with templateId1 that one of them is RESOURCE_IN_PROGRESS,"+
707 "2 with templateId2 both in PENDING_RESOURCE, then job with lowest indexInBulk from templateId2 is selected"
710 {ImmutableList.of( (Jobber)
711 () -> createNewJob(6, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), true)
714 "given 1 job in PENDING_RESOURCE but it's deleted, then no job is selected"
716 {ImmutableList.of( (Jobber)
717 () -> createNewJob(20, templateId1, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusSeconds(1), false),
718 () -> createNewJob(1, templateId1, "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
719 () -> createNewJob(2, templateId1, "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
720 () -> createNewJob(3, templateId1, "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
721 () -> createNewJob(4, templateId1, "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
722 () -> createNewJob(5, templateId1, "userId", STOPPED, null, LocalDateTime.now().minusSeconds(1), false),
723 () -> createNewJob(6, templateId1, "userId", PAUSE, null, LocalDateTime.now().minusSeconds(1), false)
726 "given multiple jobs with same templateId, 1 in PENDING_RESOURCE, and other are not in progress, "+
727 "then the job in PENDING_RESOURCE is selected"
729 {ImmutableList.of( (Jobber)
730 () -> createNewJob(1, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1), false),
731 () -> createNewJob(2, UUID.randomUUID(), "userId", COMPLETED, null, LocalDateTime.now().minusSeconds(1), false),
732 () -> createNewJob(3, UUID.randomUUID(), "userId", FAILED, null, LocalDateTime.now().minusSeconds(1), false),
733 () -> createNewJob(4, UUID.randomUUID(), "userId", COMPLETED_WITH_ERRORS, null, LocalDateTime.now().minusSeconds(1), false),
734 () -> createNewJob(5, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false),
735 () -> createNewJob(6, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1), false)
738 "given there is no job in PENDING_RESOURCE state, then no job is selected"
740 {ImmutableList.of( (Jobber)
741 () -> createNewJob(6, null, "userId", PENDING_RESOURCE, null, LocalDateTime.now().minusMinutes(2), false)
744 "given there is 1 job in PENDING_RESOURCE but without templateId, then no job is selected"
749 @Test(dataProvider = "jobsForTestingPendingResource")
750 public void givenSomeJobs_pullPendingResource_returnNextOrNothingAsExpected(List<Jobber> jobbers, int expectedIndexSelected, String assertionReason) {
751 givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(jobbers, 1, expectedIndexSelected, PENDING_RESOURCE, assertionReason);
754 public static JobDaoImpl createNewJob(Job.JobStatus status, String takenBy, long secondsOffset, boolean deleted) {
755 return createNewJob(1, UUID.randomUUID(), "af456", status, takenBy, LocalDateTime.now().minusSeconds(secondsOffset), deleted);
759 public void givenSomeJobs_deleteOldFinalJobs_onlyExpectedJobsAreDeleted() {
761 final List<Pair<JobDaoImpl,Boolean>> jobs = ImmutableList.of(
763 Pair.of(createNewJob(IN_PROGRESS, null, seconds+1, false), true),
764 Pair.of(createNewJob(RESOURCE_IN_PROGRESS, null, seconds+1, false), true),
765 Pair.of(createNewJob(PENDING, null, seconds+1, false), true),
766 Pair.of(createNewJob(CREATING, null, seconds+1, false), true),
767 Pair.of(createNewJob(PENDING_RESOURCE, null, seconds+1, false), true),
768 Pair.of(createNewJob(PAUSE, null, seconds+1, false), true),
771 Pair.of(createNewJob(COMPLETED, null, seconds+1, false), false),
772 Pair.of(createNewJob(FAILED, null, seconds+1, false), false),
773 Pair.of(createNewJob(STOPPED, null, seconds+1, false), false),
774 Pair.of(createNewJob(COMPLETED_WITH_ERRORS, null, seconds+1, true), false),
775 Pair.of(createNewJob(COMPLETED_WITH_NO_ACTION, generateRandomAlphaNumeric(5), seconds+1, true), false),
778 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-2, false), true),
779 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), seconds-400, false), true),
780 Pair.of(createNewJob(COMPLETED, generateRandomAlphaNumeric(5), 0, false), true)
782 addJobsWithModifiedDateByJobDao(jobs.stream().map(Pair::getLeft).collect(Collectors.toList()), broker);
783 assertEquals(jobs.size(), broker.peek().size());
785 broker.deleteOldFinalJobs(seconds);
786 Stream<Pair<UUID, Job.JobStatus>> expectedJobs = jobs.stream()
787 .filter(Pair::getRight)
789 x.getLeft().getUuid(),
790 x.getLeft().getStatus()
792 assertThat(broker.peek().stream().map(x->Pair.of(x.getUuid(), x.getStatus())).collect(Collectors.toList()),
793 containsInAnyOrder(expectedJobs.toArray()));
797 public Object[][] topics() {
798 return Arrays.stream(Job.JobStatus.values())
799 .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS, PENDING_RESOURCE).contains(t)))
800 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
803 @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
804 public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
805 broker.pull(topic, UUID.randomUUID().toString());
808 @Test(expectedExceptions = NoJobException.class)
809 public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
810 Stream.of(Job.JobStatus.values())
811 .filter(not(s -> s.equals(PENDING)))
812 .map(s -> createMockJob("some user id", s))
813 .map(job -> newJobAsync(broker, job))
814 .map(this::waitForFutureJob)
817 waitForFutureOptionalJob(pullJobAsync(broker));
821 public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
822 newJobAsync(broker); // this negated the expected result of the call below
823 givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
826 @Test(expectedExceptions = NoJobException.class)
827 public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
828 putAndGetALotOfJobs(broker);
829 waitForFutureOptionalJob(pullJobAsync(broker));
832 @Test(expectedExceptions = NoJobException.class)
833 public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
834 final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
835 assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
836 waitForFutureOptionalJob(futureOptionalJob);
839 @Test(expectedExceptions = IllegalStateException.class)
840 public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
841 waitForFutureJob(newJobAsync(broker));
842 waitForFutureJob(newJobAsync(broker));
843 waitForFutureOptionalJob(pullJobAsync(broker));
845 Job myJob = createMockJob("user id");
846 myJob.setUuid(UUID.randomUUID());
848 broker.pushBack(myJob); //Should fail
852 public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
853 final ImmutableMap<String, Object> randomDataForMostRecentJobType =
854 ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
856 waitForFutureJob(newJobAsync(broker));
857 final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
859 job.setStatus(Job.JobStatus.PENDING);
860 job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
861 job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
862 job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
864 broker.pushBack(job);
865 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
867 assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
868 assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
869 assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
872 private static String jobDataReflected(Job job) {
873 return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
874 .setExcludeFieldNames("created", "modified", "takenBy")
878 @Test(expectedExceptions = IllegalStateException.class)
879 public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
880 waitForFutureJob(newJobAsync(broker));
881 final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
883 broker.pushBack(job);
884 broker.pushBack(job); //Should fail
888 public void addJob_PeekItById_verifySameJobWasPeeked() {
889 String userId = UUID.randomUUID().toString();
890 Job myJob = createMockJob(userId);
891 UUID uuid = broker.add(myJob);
892 Job peekedJob = broker.peek(uuid);
893 assertEquals("added testId is not the same as peeked TestsId",
895 peekedJob.getSharedData().getUserId());
898 @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
899 public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
900 final Job job = waitForFutureJob(newJobAsync(broker, status));
901 broker.delete(job.getUuid());
902 assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
903 waitForFutureOptionalJob(pullJobAsync(broker));
907 public static Object[][] jobStatusesForSuccessDelete() {
908 return new Object[][]{
915 dataProvider = "jobStatusesForFailedDelete",
916 expectedExceptions = OperationNotAllowedException.class,
917 expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
919 public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
920 final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
923 waitForFutureOptionalJob(pullJobAsync(broker));
927 broker.delete(job.getUuid());
931 public static Object[][] jobStatusesForFailedDelete() {
932 return new Object[][]{
934 {IN_PROGRESS, false},
941 @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
942 public void deleteJob_notExist_exceptionIsThrown() {
943 waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
944 broker.delete(new UUID(111, 111));