1 package org.onap.vid.services;
4 import com.google.common.collect.ImmutableList;
5 import com.google.common.collect.ImmutableMap;
6 import org.apache.commons.lang.RandomStringUtils;
7 import org.apache.commons.lang3.RandomUtils;
8 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
9 import org.apache.commons.lang3.builder.ToStringStyle;
10 import org.apache.log4j.LogManager;
11 import org.apache.log4j.Logger;
12 import org.hibernate.SessionFactory;
13 import org.onap.portalsdk.core.domain.support.DomainVo;
14 import org.onap.portalsdk.core.service.DataAccessService;
15 import org.onap.portalsdk.core.util.SystemProperties;
16 import org.onap.vid.config.DataSourceConfig;
17 import org.onap.vid.config.JobAdapterConfig;
18 import org.onap.vid.exceptions.GenericUncheckedException;
19 import org.onap.vid.exceptions.OperationNotAllowedException;
20 import org.onap.vid.job.Job;
21 import org.onap.vid.job.JobAdapter;
22 import org.onap.vid.job.JobType;
23 import org.onap.vid.job.JobsBrokerService;
24 import org.onap.vid.job.command.JobCommandFactoryTest;
25 import org.onap.vid.job.impl.JobDaoImpl;
26 import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
27 import org.onap.vid.utils.DaoUtils;
28 import org.springframework.test.context.ContextConfiguration;
29 import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
30 import org.testng.Assert;
31 import org.testng.annotations.AfterMethod;
32 import org.testng.annotations.BeforeMethod;
33 import org.testng.annotations.DataProvider;
34 import org.testng.annotations.Test;
36 import javax.inject.Inject;
37 import java.lang.reflect.Method;
38 import java.time.LocalDateTime;
39 import java.time.ZoneId;
41 import java.util.concurrent.*;
42 import java.util.stream.IntStream;
43 import java.util.stream.Stream;
45 import static java.util.concurrent.TimeUnit.MILLISECONDS;
46 import static java.util.stream.Collectors.toList;
47 import static org.hamcrest.CoreMatchers.equalTo;
48 import static org.hamcrest.CoreMatchers.is;
49 import static org.hamcrest.MatcherAssert.assertThat;
50 import static org.hamcrest.Matchers.both;
51 import static org.hamcrest.Matchers.containsInAnyOrder;
52 import static org.onap.vid.job.Job.JobStatus.*;
53 import static org.onap.vid.utils.Streams.not;
54 import static org.testng.Assert.assertNotNull;
55 import static org.testng.AssertJUnit.assertEquals;
57 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
58 public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
60 private static final Logger logger = LogManager.getLogger(JobsBrokerServiceTest.class);
62 private static final int JOBS_COUNT = 127;
63 private static final boolean DELETED = true;
64 private final ExecutorService executor = Executors.newFixedThreadPool(90);
66 private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
68 private final long FEW = 500;
70 private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
71 private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
72 private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
73 private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
74 private JobsBrokerService broker;
77 JobAdapter jobAdapter;
79 private DataAccessService dataAccessService;
81 private SessionFactory sessionFactory;
84 - pulling jobs is limited to inserted ones
85 - putting back allows getting the job again
86 - multi threads safety
87 - any added job should be visible with view
90 - pulling with empty repo should return empty optional
91 - pulling more than expected should return empty optional
92 - putting one, over-pulling from a different thread
93 - take before inserting, then insert while waiting
97 private class NoJobException extends RuntimeException {
100 private Future<Job> newJobAsync(JobsBrokerService b) {
101 return newJobAsync(b, createMockJob("user id"));
104 private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
105 return newJobAsync(b, createMockJob("user id", status));
108 private Job createMockJob(String userId) {
109 return jobAdapter.createServiceInstantiationJob(
111 new JobCommandFactoryTest.MockedRequest(42,"nothing") ,
114 "optimisticUniqueServiceInstanceName",
115 RandomUtils.nextInt());
118 private Job createMockJob(String userId, Job.JobStatus jobStatus) {
119 Job job = createMockJob(userId);
120 job.setStatus(jobStatus);
124 private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
125 final Future<Job> jobFuture = executor.submit(() -> {
135 private void pushBackJobAsync(JobsBrokerService b, Job job) {
136 executor.submit(() -> {
143 private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
144 final Future<Optional<Job>> job = executor.submit(() -> {
146 // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
147 return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
152 private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
154 return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
155 } catch (TimeoutException | InterruptedException | ExecutionException e) {
156 throw new RuntimeException(e);
160 private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
162 return retrievedJobFuture.get(FEW, MILLISECONDS);
163 } catch (TimeoutException | InterruptedException | ExecutionException e) {
164 throw new RuntimeException(e);
168 private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
169 final List<Job> originalJobs = putALotOfJobs(broker);
170 final List<Job> retrievedJobs = getAlotOfJobs(broker);
172 assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
174 return retrievedJobs;
177 private List<Job> putALotOfJobs(JobsBrokerService broker) {
179 return IntStream.range(0, n)
180 .mapToObj(i -> newJobAsync(broker))
181 .map(this::waitForFutureJob)
185 private List<Job> getAlotOfJobs(JobsBrokerService broker) {
187 return IntStream.range(0, n)
188 .mapToObj(i -> pullJobAsync(broker))
189 .map(this::waitForFutureOptionalJob)
193 private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
194 jobs.forEach(job -> pushBackJobAsync(broker, job));
197 private void accountThreadId() {
198 threadsIds.add(Thread.currentThread().getId());
202 public void threadsCounter() {
203 logger.info("participating threads count: " + threadsIds.size());
208 public void initializeBroker() {
209 broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0);
210 ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
213 @Test(enabled = false)
214 public void givenSingleJob_getIt_verifySameJob() {
215 final Job originalJob = waitForFutureJob(newJobAsync(broker));
217 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
218 assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
221 @Test(enabled = false)
222 public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
223 final List<Job> originalJobs = putALotOfJobs(broker);
225 MILLISECONDS.sleep(FEW);
226 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
228 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
230 MILLISECONDS.sleep(FEW);
231 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
233 pushBackJobAsync(broker, retrievedJob);
235 MILLISECONDS.sleep(FEW);
236 assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
239 @Test(enabled = false)
240 public void givenManyJobs_getThemAll_verifySameJobs() {
241 putAndGetALotOfJobs(broker);
244 @Test(enabled = false)
245 public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
246 final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
248 pushBackJobs(retrievedJobs1, broker);
249 final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
251 assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
254 private static Date toDate(LocalDateTime localDateTime) {
255 return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
258 private void setModifiedDateToJob(UUID jobUuid, Date date) {
259 DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
260 job.setModified(date);
261 DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
262 session.saveOrUpdate(job);
268 public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
269 return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
272 public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
273 JobDaoImpl job = new JobDaoImpl();
274 job.setUuid(UUID.randomUUID());
275 job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
276 job.setIndexInBulk(indexInBulk);
277 job.setTemplateId(templateId);
278 job.setType(JobType.NoOp);
279 job.setStatus(status);
280 job.setTakenBy(takenBy);
281 job.setCreated(toDate(date));
282 job.setModified(toDate(date));
283 job.setUserId(userId);
285 job.setDeletedAt(new Date());
291 public static Object[][] jobs(Method test) {
292 LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
293 UUID sameTemplate = UUID.randomUUID();
294 return new Object[][]{
296 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
297 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
298 () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
299 () -> createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
303 "Broker should pull the first pending job by oldest date then by job index"
306 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
307 () -> createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),
308 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
309 () -> createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
310 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
311 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
312 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
316 "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
319 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
320 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
321 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
325 "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
328 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
329 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
330 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
334 "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
337 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
338 () -> createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
339 () -> createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
340 () -> createNewJob(12, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, UUID.randomUUID().toString(), oldestDate)
345 "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
348 (Jobber)() -> createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
349 () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
350 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
354 "Broker should not pull any job when there is another job from this template that was taken"
357 (Jobber)() -> createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
358 () -> createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
359 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
363 "Broker should not pull any job when there is another job from this template that in progress"
366 (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
367 () -> createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
368 () -> createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
372 "Broker should not pull any job when there is another job from this template that was failed"
375 (Jobber)() -> createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
376 () -> createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
377 () -> createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
381 "Broker should pull pending job when there is another job from this template that was deleted, although failed"
384 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
385 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
386 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
390 "Broker should prioritize jobs of user that has no in-progress jobs"
393 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
394 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
395 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
399 "Broker should prioritize jobs of user that has no taken jobs"
402 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
403 () -> createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
404 () -> createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
405 () -> createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
406 () -> createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
410 "Broker should take oldest job when there is one in-progress job to each user"
413 (Jobber)() -> createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
414 () -> createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
415 () -> createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
419 "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
422 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
423 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
424 () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
425 () -> createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
429 "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
432 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
433 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
434 () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
435 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
436 () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
437 () -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate, DELETED),
438 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
439 () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
440 () -> createNewJob(16, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate)
445 "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore all other statuses"
448 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
449 () -> createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
450 () -> createNewJob(13, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
451 () -> createNewJob(14, UUID.randomUUID(), "userId", STOPPED, null, oldestDate),
452 () -> createNewJob(15, UUID.randomUUID(), "userId", CREATING, null, oldestDate),
453 () -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate, DELETED),
454 () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, oldestDate),
455 () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
456 () -> createNewJob(16, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate)
460 RESOURCE_IN_PROGRESS,
461 "Broker with RESOURCE_IN_PROGRESS topic should pull only RESOURCE_IN_PROGRESS jobs - first RESOURCE_IN_PROGRESS job by oldest date - ignore all other statuses"
464 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
465 () -> createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
466 () -> createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
470 "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
473 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now()),
474 () -> createNewJob(22, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
475 () -> createNewJob(33, UUID.randomUUID(), "userId", RESOURCE_IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
478 RESOURCE_IN_PROGRESS,
479 "Broker with RESOURCE_IN_PROGRESS topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
482 (Jobber)() -> createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now()),
483 () -> createNewJob(22, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusSeconds(1)),
484 () -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now().minusHours(2))),
488 "Broker with creating topic should pull oldest creating job and ignore mso limit"
491 (Jobber)() -> createNewJob(33, UUID.randomUUID(), "userId", CREATING, null, LocalDateTime.now())),
495 "Broker with CREATING topic should pull CREATING job that was just modified"
501 public interface Jobber {
502 // Will defer LocalDateTime.now() to test's "real-time"
506 @Test(enabled = false, dataProvider = "jobs")
507 public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
508 JobsBrokerServiceInDatabaseImpl broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20);
509 final List<JobDaoImpl> jobs = jobbers.stream().map(Jobber::toJob).collect(toList());
510 for (JobDaoImpl job : jobs) {
511 Date modifiedDate = job.getModified();
513 setModifiedDateToJob(job.getUuid(), modifiedDate);
515 Optional<Job> nextJob = broker.pull(topic, UUID.randomUUID().toString());
516 boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
517 String pulledJobDesc = nextJob.map(job -> ". pulled job: " + job.toString()).orElse(". no job pulled");
518 Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason+pulledJobDesc);
519 if (shouldAnyBeSelected) {
520 Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
525 public Object[][] topics() {
526 return Arrays.stream(Job.JobStatus.values())
527 .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS, CREATING, RESOURCE_IN_PROGRESS).contains(t)))
528 .map(v -> new Object[]{v}).collect(toList()).toArray(new Object[][]{});
531 @Test(enabled = false, dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
532 public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
533 broker.pull(topic, UUID.randomUUID().toString());
536 @Test(enabled = false, expectedExceptions = NoJobException.class)
537 public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
538 Stream.of(Job.JobStatus.values())
539 .filter(not(s -> s.equals(PENDING)))
540 .map(s -> createMockJob("some user id", s))
541 .map(job -> newJobAsync(broker, job))
542 .map(this::waitForFutureJob)
545 waitForFutureOptionalJob(pullJobAsync(broker));
548 @Test(enabled = false)
549 public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
550 newJobAsync(broker); // this negated the expected result of the call below
551 givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
554 @Test(enabled = false, expectedExceptions = NoJobException.class)
555 public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
556 putAndGetALotOfJobs(broker);
557 waitForFutureOptionalJob(pullJobAsync(broker));
560 @Test(enabled = false, expectedExceptions = NoJobException.class)
561 public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
562 final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
563 assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
564 waitForFutureOptionalJob(futureOptionalJob);
567 @Test(enabled = false, expectedExceptions = IllegalStateException.class)
568 public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
569 waitForFutureJob(newJobAsync(broker));
570 waitForFutureJob(newJobAsync(broker));
571 waitForFutureOptionalJob(pullJobAsync(broker));
573 Job myJob = createMockJob("user id");
574 myJob.setUuid(UUID.randomUUID());
576 broker.pushBack(myJob); //Should fail
579 @Test(enabled = false)
580 public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
581 final ImmutableMap<String, Object> randomDataForMostRecentJobType =
582 ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
584 waitForFutureJob(newJobAsync(broker));
585 final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
587 job.setStatus(Job.JobStatus.PENDING);
588 job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
589 job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
590 job.setTypeAndData(JobType.MacroServiceInstantiation, randomDataForMostRecentJobType);
592 broker.pushBack(job);
593 final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
595 assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
596 assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
597 assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
600 private static String jobDataReflected(Job job) {
601 return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
602 .setExcludeFieldNames("created", "modified", "takenBy")
606 @Test(enabled = false, expectedExceptions = IllegalStateException.class)
607 public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
608 waitForFutureJob(newJobAsync(broker));
609 final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
611 broker.pushBack(job);
612 broker.pushBack(job); //Should fail
615 @Test(enabled = false)
616 public void addJob_PeekItById_verifySameJobWasPeeked() {
617 String userId = UUID.randomUUID().toString();
618 Job myJob = createMockJob(userId);
619 UUID uuid = broker.add(myJob);
620 Job peekedJob = broker.peek(uuid);
621 assertEquals("added testId is not the same as peeked TestsId",
623 peekedJob.getSharedData().getUserId());
626 @Test(enabled = false, dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
627 public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
628 final Job job = waitForFutureJob(newJobAsync(broker, status));
629 broker.delete(job.getUuid());
630 assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
631 waitForFutureOptionalJob(pullJobAsync(broker));
635 public static Object[][] jobStatusesForSuccessDelete() {
636 return new Object[][]{
642 @Test(enabled = false,
643 dataProvider = "jobStatusesForFailedDelete",
644 expectedExceptions = OperationNotAllowedException.class,
645 expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
647 public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
648 final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
651 waitForFutureOptionalJob(pullJobAsync(broker));
655 broker.delete(job.getUuid());
659 public static Object[][] jobStatusesForFailedDelete() {
660 return new Object[][]{
662 {IN_PROGRESS, false},
669 @Test(enabled = false, expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
670 public void deleteJob_notExist_exceptionIsThrown() {
671 waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
672 broker.delete(new UUID(111, 111));