2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.vid.services;
24 import static java.util.concurrent.TimeUnit.MILLISECONDS;
25 import static java.util.stream.Collectors.toList;
26 import static org.hamcrest.CoreMatchers.equalTo;
27 import static org.hamcrest.CoreMatchers.is;
28 import static org.hamcrest.MatcherAssert.assertThat;
29 import static org.hamcrest.Matchers.both;
30 import static org.hamcrest.Matchers.containsInAnyOrder;
31 import static org.onap.vid.job.Job.JobStatus.COMPLETED;
32 import static org.onap.vid.job.Job.JobStatus.CREATING;
33 import static org.onap.vid.job.Job.JobStatus.FAILED;
34 import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
35 import static org.onap.vid.job.Job.JobStatus.PAUSE;
36 import static org.onap.vid.job.Job.JobStatus.PENDING;
37 import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
38 import static org.onap.vid.job.Job.JobStatus.STOPPED;
39 import static org.onap.vid.utils.Streams.not;
40 import static org.testng.Assert.assertNotNull;
41 import static org.testng.AssertJUnit.assertEquals;
43 import com.google.common.collect.ImmutableList;
44 import com.google.common.collect.ImmutableMap;
45 import java.lang.reflect.Method;
46 import java.time.LocalDateTime;
47 import java.time.ZoneId;
48 import java.util.Arrays;
49 import java.util.Date;
50 import java.util.List;
51 import java.util.Optional;
53 import java.util.UUID;
54 import java.util.concurrent.ConcurrentSkipListSet;
55 import java.util.concurrent.ExecutionException;
56 import java.util.concurrent.ExecutorService;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.Future;
59 import java.util.concurrent.TimeoutException;
60 import java.util.stream.IntStream;
61 import java.util.stream.Stream;
62 import javax.inject.Inject;
63 import org.apache.commons.lang.RandomStringUtils;
64 import org.apache.commons.lang3.RandomUtils;
65 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
66 import org.apache.commons.lang3.builder.ToStringStyle;
67 import org.apache.log4j.LogManager;
68 import org.apache.log4j.Logger;
69 import org.hibernate.SessionFactory;
70 import org.onap.portalsdk.core.domain.support.DomainVo;
71 import org.onap.portalsdk.core.service.DataAccessService;
72 import org.onap.portalsdk.core.util.SystemProperties;
73 import org.onap.vid.config.DataSourceConfig;
74 import org.onap.vid.config.JobAdapterConfig;
75 import org.onap.vid.exceptions.GenericUncheckedException;
76 import org.onap.vid.exceptions.OperationNotAllowedException;
77 import org.onap.vid.job.Job;
78 import org.onap.vid.job.JobAdapter;
79 import org.onap.vid.job.JobType;
80 import org.onap.vid.job.JobsBrokerService;
81 import org.onap.vid.job.command.JobCommandFactoryTest;
82 import org.onap.vid.job.impl.JobDaoImpl;
83 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
84 import org.onap.vid.utils.DaoUtils;
85 import org.springframework.test.context.ContextConfiguration;
86 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
87 import org.testng.Assert;
88 import org.testng.annotations.AfterMethod;
89 import org.testng.annotations.BeforeMethod;
90 import org.testng.annotations.DataProvider;
91 import org.testng.annotations.Test;
93 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
94 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
96 private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
98 private static final int JOBS_COUNT = 127;
99 private static final boolean DELETED = true;
100 private final ExecutorService executor = Executors.newFixedThreadPool(90);
102 private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
104 private final long FEW = 1000;
106 private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
107 private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
108 private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
109 private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
110 private JobsBrokerService broker;
113 JobAdapter jobAdapter;
115 private DataAccessService dataAccessService;
117 private SessionFactory sessionFactory;
120 - pulling jobs is limited to inserted ones
121 - putting back allows getting the job again
122 - multi threads safety
123 - any added job should be visible with view
126 - pulling with empty repo should return empty optional
127 - pulling more than expected should return empty optional
128 - putting one, over-pulling from a different thread
129 - take before inserting, then insert while waiting
133 private class NoJobException extends RuntimeException {
136 private Future<Job> newJobAsync(JobsBrokerService b) {
137 return newJobAsync(b, createMockJob("user id"));
140 private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
141 return newJobAsync(b, createMockJob("user id", status));
144 private Job createMockJob(String userId) {
145 return jobAdapter.createServiceInstantiationJob(
147 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
150 "optimisticUniqueServiceInstanceName",
151 RandomUtils.nextInt());
154 private Job createMockJob(String userId, Job.JobStatus jobStatus) {
155 Job job = createMockJob(userId);
156 job.setStatus(jobStatus);
160 private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
161 final Future<Job> jobFuture = executor.submit(() -> {
171 private void pushBackJobAsync(JobsBrokerService b, Job job) {
172 executor.submit(() -> {
179 private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
180 final Future<Optional<Job>> job = executor.submit(() -> {
182 // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
183 return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
188 private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
190 return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
191 } catch (TimeoutException | InterruptedException | ExecutionException e) {
192 throw new RuntimeException(e);
196 private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
198 return retrievedJobFuture.get(FEW, MILLISECONDS);
199 } catch (TimeoutException | InterruptedException | ExecutionException e) {
200 throw new RuntimeException(e);
204 private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
205 final List<Job> originalJobs = putALotOfJobs(broker);
206 final List<Job> retrievedJobs = getAlotOfJobs(broker);
208 assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
210 return retrievedJobs;
213 private List<Job> putALotOfJobs(JobsBrokerService broker) {
215 return IntStream.range(0, n)
216 .mapToObj(i -> newJobAsync(broker))
217 .map(this::waitForFutureJob)
221 private List<Job> getAlotOfJobs(JobsBrokerService broker) {
223 return IntStream.range(0, n)
224 .mapToObj(i -> pullJobAsync(broker))
225 .map(this::waitForFutureOptionalJob)
229 private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
230 jobs.forEach(job -> pushBackJobAsync(broker, job));
233 private void accountThreadId() {
234 threadsIds.add(Thread.currentThread().getId());
238 public void threadsCounter() {
239 logger.info("participating threads count: " + threadsIds.size());
244 public void initializeBroker() {
245 broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0);
246 ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
250 public void givenSingleJob_getIt_verifySameJob() {
251 final Job originalJob = waitForFutureJob(newJobAsync(broker));
253 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
254 assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
258 public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
259 final List<Job> originalJobs = putALotOfJobs(broker);
261 MILLISECONDS.sleep(FEW);
262 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
264 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
266 MILLISECONDS.sleep(FEW);
267 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
269 pushBackJobAsync(broker, retrievedJob);
271 MILLISECONDS.sleep(FEW);
272 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
276 public void givenManyJobs_getThemAll_verifySameJobs() {
277 putAndGetALotOfJobs(broker);
281 public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
282 final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
284 pushBackJobs(retrievedJobs1, broker);
285 final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
287 assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
290 private static Date toDate(LocalDateTime localDateTime) {
291 return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
294 private void setModifiedDateToJob(UUID jobUuid, Date date) {
295 DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
296 job.setModified(date);
297 DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
298 session.saveOrUpdate(job);
304 public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
305 return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
308 public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
309 JobDaoImpl job = new JobDaoImpl();
310 job.setUuid(UUID.randomUUID());
311 job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
312 job.setIndexInBulk(indexInBulk);
313 job.setTemplateId(templateId);
314 job.setType(JobType.NoOp);
315 job.setStatus(status);
316 job.setTakenBy(takenBy);
317 job.setCreated(toDate(date));
318 job.setModified(toDate(date));
319 job.setUserId(userId);
321 job.setDeletedAt(new Date());
327 public static Object[][] jobs(Method test) {
328 LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
329 UUID sameTemplate = UUID.randomUUID();
330 return new Object[][]{
332 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
333 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
334 () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
335 () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
339 "Broker should pull the first pending job by oldest date then by job index"
342 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
343 () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
344 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
345 () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
346 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
347 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
348 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
352 "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
355 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
356 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
357 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
361 "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
364 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
365 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
366 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
370 "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
373 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
374 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
375 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
376 () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
381 "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
384 (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
385 () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
386 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
390 "Broker should not pull any job when there is another job from this template that was taken"
393 (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
394 () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
395 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
399 "Broker should not pull any job when there is another job from this template that in progress"
402 (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
403 () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
404 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
408 "Broker should not pull any job when there is another job from this template that was failed"
411 (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
412 () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
413 () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
417 "Broker should pull pending job when there is another job from this template that was deleted, although failed"
420 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
421 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
422 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
426 "Broker should prioritize jobs of user that has no in-progress jobs"
429 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
430 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
431 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
435 "Broker should prioritize jobs of user that has no taken jobs"
438 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
439 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
440 () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
441 () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
442 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
446 "Broker should take oldest job when there is one in-progress job to each user"
449 (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
450 () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
451 () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
455 "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
458 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
459 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
460 () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
461 () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
465 "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
468 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
469 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
470 () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
471 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
472 () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
473 () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
474 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
475 () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
476 () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
481 "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
484 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
485 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
486 () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
487 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
488 () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
489 () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
490 () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
491 () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
492 () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
496 RESOURCE_IN_PROGRESS,
497 "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
500 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
501 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
502 () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
506 "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
509 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
510 () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
511 () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
514 RESOURCE_IN_PROGRESS,
515 "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
518 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
519 () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
520 () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
524 "Broker with creating topic should pull oldest creating job and ignore mso limit"
527 (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
531 "Broker with CREATING topic should pull CREATING job that was just modified"
537 public interface Jobber {
538 // Will defer LocalDateTime.now() to test's "real-time"
542 @Test(dataProvider = "jobs")
543 public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
544 JobsBrokerServiceInDatabaseImpl broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20);
545 final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
546 for (JobDaoImpl job : jobs) {
547 Date modifiedDate = job.getModified();
549 setModifiedDateToJob(job.getUuid(), modifiedDate);
551 Optional<Job> nextJob = broker.pull(topic, UUID.randomUUID().toString());
552 boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
553 String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
554 Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
555 if (shouldAnyBeSelected) {
556 Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
561 public Object[][] topics() {
562 return Arrays.stream(Job.JobStatus.values())
563 .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS).contains(t)))
564 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
567 @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
568 public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
569 broker.pull(topic, UUID.randomUUID().toString());
572 @Test(expectedExceptions = NoJobException.class)
573 public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
574 Stream.of(Job.JobStatus.values())
575 .filter(not(s -> s.equals(PENDING)))
576 .map(s -> createMockJob("some user id", s))
577 .map(job -> newJobAsync(broker, job))
578 .map(this::waitForFutureJob)
581 waitForFutureOptionalJob(pullJobAsync(broker));
585 public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
586 newJobAsync(broker); // this negated the expected result of the call below
587 givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
590 @Test(expectedExceptions = NoJobException.class)
591 public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
592 putAndGetALotOfJobs(broker);
593 waitForFutureOptionalJob(pullJobAsync(broker));
596 @Test(expectedExceptions = NoJobException.class)
597 public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
598 final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
599 assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
600 waitForFutureOptionalJob(futureOptionalJob);
603 @Test(expectedExceptions = IllegalStateException.class)
604 public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
605 waitForFutureJob(newJobAsync(broker));
606 waitForFutureJob(newJobAsync(broker));
607 waitForFutureOptionalJob(pullJobAsync(broker));
609 Job myJob = createMockJob("user id");
610 myJob.setUuid(UUID.randomUUID());
612 broker.pushBack(myJob); //Should fail
616 public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
617 final ImmutableMap<String, Object> randomDataForMostRecentJobType =
618 ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
620 waitForFutureJob(newJobAsync(broker));
621 final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
623 job.setStatus(Job.JobStatus.PENDING);
624 job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
625 job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
626 job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
628 broker.pushBack(job);
629 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
631 assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
632 assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
633 assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
636 private static String jobDataReflected(Job job) {
637 return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
638 .setExcludeFieldNames("created", "modified", "takenBy")
642 @Test(expectedExceptions = IllegalStateException.class)
643 public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
644 waitForFutureJob(newJobAsync(broker));
645 final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
647 broker.pushBack(job);
648 broker.pushBack(job); //Should fail
652 public void addJob_PeekItById_verifySameJobWasPeeked() {
653 String userId = UUID.randomUUID().toString();
654 Job myJob = createMockJob(userId);
655 UUID uuid = broker.add(myJob);
656 Job peekedJob = broker.peek(uuid);
657 assertEquals("added testId is not the same as peeked TestsId",
659 peekedJob.getSharedData().getUserId());
662 @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
663 public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
664 final Job job = waitForFutureJob(newJobAsync(broker, status));
665 broker.delete(job.getUuid());
666 assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
667 waitForFutureOptionalJob(pullJobAsync(broker));
671 public static Object[][] jobStatusesForSuccessDelete() {
672 return new Object[][]{
679 dataProvider = "jobStatusesForFailedDelete",
680 expectedExceptions = OperationNotAllowedException.class,
681 expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
683 public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
684 final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
687 waitForFutureOptionalJob(pullJobAsync(broker));
691 broker.delete(job.getUuid());
695 public static Object[][] jobStatusesForFailedDelete() {
696 return new Object[][]{
698 {IN_PROGRESS, false},
705 @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
706 public void deleteJob_notExist_exceptionIsThrown() {
707 waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
708 broker.delete(new UUID(111, 111));