ff4b34f5f4733c7465d17ec2f2697d6a5b2da136
[vid.git] / vid-app-common / src / test / java / org / onap / vid / services / JobsBrokerServiceTest.java
1 package org.onap.vid.services;
2
3 //
4 //import com.google.common.collect.ImmutableList;
5 //import com.google.common.collect.ImmutableMap;
6 //import org.apache.commons.lang.RandomStringUtils;
7 //import org.apache.commons.lang3.RandomUtils;
8 //import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
9 //import org.apache.commons.lang3.builder.ToStringStyle;
10 //import org.hibernate.SessionFactory;
11 //import org.onap.vid.exceptions.GenericUncheckedException;
12 //import org.onap.vid.exceptions.OperationNotAllowedException;
13 //import org.onap.vid.job.Job;
14 //import org.onap.vid.job.JobAdapter;
15 //import org.onap.vid.job.JobType;
16 //import org.onap.vid.job.JobsBrokerService;
17 //import org.onap.vid.job.impl.JobDaoImpl;
18 //import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
19 //import org.onap.vid.utils.DaoUtils;
20 //import org.onap.vid.config.DataSourceConfig;
21 //import org.onap.vid.config.JobAdapterConfig;
22 //import org.onap.portalsdk.core.domain.support.DomainVo;
23 //import org.onap.portalsdk.core.service.DataAccessService;
24 //import org.onap.portalsdk.core.util.SystemProperties;
25 //import org.springframework.test.context.ContextConfiguration;
26 //import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
27 //import org.testng.Assert;
28 //import org.testng.annotations.AfterMethod;
29 //import org.testng.annotations.BeforeMethod;
30 //import org.testng.annotations.DataProvider;
31 //import org.testng.annotations.Test;
32 //
33 //import javax.inject.Inject;
34 //import java.lang.reflect.Method;
35 //import java.time.LocalDateTime;
36 //import java.time.ZoneId;
37 //import java.util.*;
38 //import java.util.concurrent.*;
39 //import java.util.stream.Collectors;
40 //import java.util.stream.IntStream;
41 //import java.util.stream.Stream;
42 //
43 //import static java.util.concurrent.TimeUnit.MILLISECONDS;
44 //import static org.hamcrest.CoreMatchers.equalTo;
45 //import static org.hamcrest.CoreMatchers.is;
46 //import static org.hamcrest.MatcherAssert.assertThat;
47 //import static org.hamcrest.Matchers.both;
48 //import static org.hamcrest.Matchers.containsInAnyOrder;
49 //import static org.onap.vid.job.Job.JobStatus.*;
50 //import static org.onap.vid.utils.Streams.not;
51 //import static org.testng.Assert.assertNotNull;
52 //import static org.testng.AssertJUnit.assertEquals;
53 //
54 //@ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
55 //public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
56 //
57 //    private static final int JOBS_COUNT = 127;
58 //    private static final boolean DELETED = true;
59 //    private final ExecutorService executor = Executors.newFixedThreadPool(90);
60 //
61 //    private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
62 //
63 //    private final long FEW = 500;
64 //
65 //    private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
66 //    private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
67 //    private static final String DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE = "Service status does not allow deletion from the queue";
68 //    private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
69 //    private JobsBrokerService broker;
70 //
71 //    @Inject
72 //    JobAdapter jobAdapter;
73 //    @Inject
74 //    private DataAccessService dataAccessService;
75 //    @Inject
76 //    private SessionFactory sessionFactory;
77 //
78 //    /*
79 //    - pulling jobs is limited to inserted ones
80 //    - putting back allows getting the job again
81 //    - multi threads safety
82 //    - any added job should be visible with view
83 //
84 //    - edges:
85 //        - pulling with empty repo should return empty optional
86 //        - pulling more than expected should return empty optional
87 //        - putting one, over-pulling from a different thread
88 //        - take before inserting, then insert while waiting
89 //
90 //     */
91 //
92 //    private class NoJobException extends RuntimeException {
93 //    }
94 //
95 //    private Future<Job> newJobAsync(JobsBrokerService b) {
96 //        return newJobAsync(b, createMockJob("user id"));
97 //    }
98 //
99 //    private Future<Job> newJobAsync(JobsBrokerService b, Job.JobStatus status) {
100 //        return newJobAsync(b, createMockJob("user id", status));
101 //    }
102 //
103 //    private Job createMockJob(String userId) {
104 //        return jobAdapter.createJob(
105 //                JobType.NoOp,
106 //                new JobAdapter.AsyncJobRequest() {
107 //                    public int nothing = 42;
108 //                },
109 //                UUID.randomUUID(),
110 //                userId,
111 //                RandomUtils.nextInt());
112 //    }
113 //
114 //    private Job createMockJob(String userId, Job.JobStatus jobStatus) {
115 //        Job job = createMockJob(userId);
116 //        job.setStatus(jobStatus);
117 //        return job;
118 //    }
119 //
120 //    private Future<Job> newJobAsync(JobsBrokerService b, Job job) {
121 //        final Future<Job> jobFuture = executor.submit(() -> {
122 //            accountThreadId();
123 //
124 //            b.add(job);
125 //
126 //            return job;
127 //        });
128 //        return jobFuture;
129 //    }
130 //
131 //    private void pushBackJobAsync(JobsBrokerService b, Job job) {
132 //        executor.submit(() -> {
133 //            accountThreadId();
134 //            b.pushBack(job);
135 //            return job;
136 //        });
137 //    }
138 //
139 //    private Future<Optional<Job>> pullJobAsync(JobsBrokerService broker) {
140 //        final Future<Optional<Job>> job = executor.submit(() -> {
141 //            accountThreadId();
142 //            // Pull only pending jobs, as H2 database does not support our SQL for in-progress jobs
143 //            return broker.pull(Job.JobStatus.PENDING, UUID.randomUUID().toString());
144 //        });
145 //        return job;
146 //    }
147 //
148 //    private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
149 //        try {
150 //            return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
151 //        } catch (TimeoutException | InterruptedException | ExecutionException e) {
152 //            throw new RuntimeException(e);
153 //        }
154 //    }
155 //
156 //    private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
157 //        try {
158 //            return retrievedJobFuture.get(FEW, MILLISECONDS);
159 //        } catch (TimeoutException | InterruptedException | ExecutionException e) {
160 //            throw new RuntimeException(e);
161 //        }
162 //    }
163 //
164 //    private List<Job> putAndGetALotOfJobs(JobsBrokerService broker) {
165 //        final List<Job> originalJobs = putALotOfJobs(broker);
166 //        final List<Job> retrievedJobs = getAlotOfJobs(broker);
167 //
168 //        assertThat(JOBS_SHOULD_MATCH, retrievedJobs, containsInAnyOrder(originalJobs.toArray()));
169 //
170 //        return retrievedJobs;
171 //    }
172 //
173 //    private List<Job> putALotOfJobs(JobsBrokerService broker) {
174 //        int n = JOBS_COUNT;
175 //        return IntStream.range(0, n)
176 //                .mapToObj(i -> newJobAsync(broker))
177 //                .map(this::waitForFutureJob)
178 //                .collect(Collectors.toList());
179 //    }
180 //
181 //    private List<Job> getAlotOfJobs(JobsBrokerService broker) {
182 //        int n = JOBS_COUNT;
183 //        return IntStream.range(0, n)
184 //                .mapToObj(i -> pullJobAsync(broker))
185 //                .map(this::waitForFutureOptionalJob)
186 //                .collect(Collectors.toList());
187 //    }
188 //
189 //    private void pushBackJobs(List<Job> jobs, JobsBrokerService broker) {
190 //        jobs.forEach(job -> pushBackJobAsync(broker, job));
191 //    }
192 //
193 //    private void accountThreadId() {
194 //        threadsIds.add(Thread.currentThread().getId());
195 //    }
196 //
197 //    @AfterMethod
198 //    public void threadsCounter() {
199 //        System.out.println("participating threads count: " + threadsIds.size());
200 //        threadsIds.clear();
201 //    }
202 //
203 //    @BeforeMethod
204 //    public void initializeBroker() {
205 //        broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0);
206 //        ((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
207 //    }
208 //
209 //    @Test
210 //    public void givenSingleJob_getIt_verifySameJob() {
211 //        final Job originalJob = waitForFutureJob(newJobAsync(broker));
212 //
213 //        final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
214 //        assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(originalJob));
215 //    }
216 //
217 //    @Test
218 //    public void givenManyJobs_getJobsAndPushThemBack_alwaysSeeAllOfThemWithPeek() throws InterruptedException {
219 //        final List<Job> originalJobs = putALotOfJobs(broker);
220 //
221 //        MILLISECONDS.sleep(FEW);
222 //        assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
223 //
224 //        final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
225 //
226 //        MILLISECONDS.sleep(FEW);
227 //        assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
228 //
229 //        pushBackJobAsync(broker, retrievedJob);
230 //
231 //        MILLISECONDS.sleep(FEW);
232 //        assertThat(JOBS_PEEKED_SHOULD_MATCH, broker.peek(), containsInAnyOrder(originalJobs.toArray()));
233 //    }
234 //
235 //    @Test
236 //    public void givenManyJobs_getThemAll_verifySameJobs() {
237 //        putAndGetALotOfJobs(broker);
238 //    }
239 //
240 //    @Test
241 //    public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
242 //        final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
243 //
244 //        pushBackJobs(retrievedJobs1, broker);
245 //        final List<Job> retrievedJobs2 = getAlotOfJobs(broker);
246 //
247 //        assertThat(JOBS_SHOULD_MATCH, retrievedJobs2, containsInAnyOrder(retrievedJobs1.toArray()));
248 //    }
249 //
250 //    private static Date toDate(LocalDateTime localDateTime) {
251 //        return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
252 //    }
253 //
254 //    private void setModifiedDateToJob(UUID jobUuid, Date date) {
255 //        DomainVo job = dataAccessService.getDomainObject(JobDaoImpl.class, jobUuid, DaoUtils.getPropsMap());
256 //        job.setModified(date);
257 //        DaoUtils.tryWithSessionAndTransaction(sessionFactory, session -> {
258 //            session.saveOrUpdate(job);
259 //            return 1;
260 //        });
261 //    }
262 //
263 //
264 //    public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date) {
265 //        return createNewJob(indexInBulk, templateId, userId, status, takenBy, date, false);
266 //    }
267 //
268 //    public static JobDaoImpl createNewJob(Integer indexInBulk, UUID templateId, String userId, Job.JobStatus status, String takenBy, LocalDateTime date, boolean deleted){
269 //        JobDaoImpl job = new JobDaoImpl();
270 //        job.setTypeAndData(JobType.NoOp, ImmutableMap.of("x", RandomStringUtils.randomAlphanumeric(15)));
271 //        job.setIndexInBulk(indexInBulk);
272 //        job.setTemplateId(templateId);
273 //        job.setType(JobType.NoOp);
274 //        job.setStatus(status);
275 //        job.setTakenBy(takenBy);
276 //        job.setCreated(toDate(date));
277 //        job.setModified(toDate(date));
278 //        job.setUserId(userId);
279 //        if (deleted) {
280 //            job.setDeletedAt(new Date());
281 //        }
282 //        return job;
283 //    }
284 //
285 //    @DataProvider
286 //    public static Object[][] jobs(Method test) {
287 //        LocalDateTime oldestDate = LocalDateTime.now().minusHours(30);
288 //        UUID sameTemplate = UUID.randomUUID();
289 //        return new Object[][]{
290 //                {ImmutableList.of(
291 //                        createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
292 //                        createNewJob(22, UUID.randomUUID(), "userId", PENDING, null, oldestDate),
293 //                        createNewJob(11, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2)),
294 //                        createNewJob(44, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(5))),
295 //                        4,
296 //                        0,
297 //                        PENDING,
298 //                        "Broker should pull the first pending job by oldest date then by job index"
299 //                },
300 //                { ImmutableList.of(
301 //                        createNewJob(11, UUID.randomUUID(), "userId", COMPLETED,null, oldestDate),
302 //                        createNewJob(11, UUID.randomUUID(), "userId", PENDING,null, oldestDate, DELETED),createNewJob(12, UUID.randomUUID(), "userId", FAILED,null, oldestDate),
303 //                        createNewJob(13, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
304 //                        createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
305 //                        createNewJob(22, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
306 //                        createNewJob(33, UUID.randomUUID(), "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
307 //                  6,
308 //                  5,
309 //                  PENDING,
310 //                  "Broker should pull the only pending - first pending job by oldest job - ignore deleted,completed, failed, in-progress and stopped statuses"
311 //                },
312 //                {ImmutableList.of(
313 //                        createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
314 //                        createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
315 //                        createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
316 //                        2,
317 //                        -1,
318 //                        PENDING,
319 //                        "Broker should not pull any job when it exceeded mso limit with count (in-progress) statuses"
320 //                },
321 //                {ImmutableList.of(
322 //                        createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
323 //                        createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
324 //                        createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
325 //                        2,
326 //                        -1,
327 //                        PENDING,
328 //                        "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
329 //                },
330 //                {ImmutableList.of(
331 //                        createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
332 //                        createNewJob(22, UUID.randomUUID(), "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
333 //                        createNewJob(33, UUID.randomUUID(), "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
334 //                        3,
335 //                        2,
336 //                        PENDING,
337 //                        "Broker should pull first job when it doesn't exceeded mso limit with count(in-progress or pending && taken) statuses"
338 //                },
339 //                {ImmutableList.of(
340 //                        createNewJob(11, sameTemplate, "userId", PENDING, UUID.randomUUID().toString(), oldestDate),
341 //                        createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
342 //                        createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
343 //                        3,
344 //                        -1,
345 //                        PENDING,
346 //                        "Broker should not pull any job when there is another job from this template that was taken"
347 //                },
348 //                {ImmutableList.of(
349 //                        createNewJob(11, sameTemplate, "userId", IN_PROGRESS, null, oldestDate),
350 //                        createNewJob(22, sameTemplate, "userId", PENDING, null, oldestDate),
351 //                        createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
352 //                        3,
353 //                        -1,
354 //                        PENDING,
355 //                        "Broker should not pull any job when there is another job from this template that in progress"
356 //                },
357 //                {ImmutableList.of(
358 //                        createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate),
359 //                        createNewJob(22, sameTemplate, "userId", STOPPED, null, oldestDate),
360 //                        createNewJob(33, sameTemplate, "userId", PENDING, null, LocalDateTime.now().minusHours(2))),
361 //                        3,
362 //                        -1,
363 //                        PENDING,
364 //                        "Broker should not pull any job when there is another job from this template that was failed"
365 //                },
366 //                {ImmutableList.of(
367 //                        createNewJob(11, sameTemplate, "userId", FAILED, null, oldestDate, DELETED),
368 //                        createNewJob(22, sameTemplate, "userId", STOPPED,null, oldestDate),
369 //                        createNewJob(33, sameTemplate, "userId", PENDING,null, LocalDateTime.now().minusHours(2))),
370 //                   3,
371 //                   2,
372 //                   PENDING,
373 //                   "Broker should pull pending job when there is another job from this template that was deleted, although failed"
374 //                },
375 //                { ImmutableList.of(
376 //                        createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, null, oldestDate),
377 //                        createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
378 //                        createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
379 //                        3,
380 //                        2,
381 //                        PENDING,
382 //                        "Broker should prioritize jobs of user that has no in-progress jobs"
383 //                },
384 //                {ImmutableList.of(
385 //                        createNewJob(11, UUID.randomUUID(), "userA", PENDING, UUID.randomUUID().toString(), oldestDate),
386 //                        createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, oldestDate),
387 //                        createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, LocalDateTime.now().minusHours(2))),
388 //                        3,
389 //                        2,
390 //                        PENDING,
391 //                        "Broker should prioritize jobs of user that has no taken jobs"
392 //                },
393 //                {ImmutableList.of(
394 //                        createNewJob(11, UUID.randomUUID(), "userA", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
395 //                        createNewJob(22, UUID.randomUUID(), "userA", PENDING, null, LocalDateTime.now().minusHours(2)),
396 //                        createNewJob(31, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
397 //                        createNewJob(32, UUID.randomUUID(), "userB", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
398 //                        createNewJob(33, UUID.randomUUID(), "userB", PENDING, null, oldestDate)),
399 //                        5,
400 //                        4,
401 //                        PENDING,
402 //                        "Broker should take oldest job when there is one in-progress job to each user"
403 //                },
404 //                {ImmutableList.of(
405 //                        createNewJob(11, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
406 //                        createNewJob(22, UUID.randomUUID(), UUID.randomUUID().toString(), IN_PROGRESS, null, oldestDate),
407 //                        createNewJob(33, UUID.randomUUID(), UUID.randomUUID().toString(), PENDING, null, LocalDateTime.now().minusHours(2))),
408 //                        2,
409 //                        -1,
410 //                        PENDING,
411 //                        "Broker should not pull any job when it exceeded mso limit with count(in-progress or pending && taken) statuses"
412 //                },
413 //                {ImmutableList.of(
414 //                        createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, UUID.randomUUID().toString(), oldestDate),
415 //                        createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, oldestDate),
416 //                        createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(2)),
417 //                        createNewJob(44, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusHours(5))),
418 //                        20,
419 //                        1,
420 //                        IN_PROGRESS,
421 //                        "Broker with in progress topic should pull the first in progress and not taken job by oldest date"
422 //                },
423 //                {ImmutableList.of(
424 //                        createNewJob(11, UUID.randomUUID(), "userId", COMPLETED, null, oldestDate),
425 //                        createNewJob(12, UUID.randomUUID(), "userId", FAILED, null, oldestDate),
426 //                        createNewJob(13, UUID.randomUUID(), "userId", PENDING,null, oldestDate),
427 //                        createNewJob(14, UUID.randomUUID(), "userId", STOPPED,null, oldestDate),
428 //                        createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate, DELETED),createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS,null, oldestDate),
429 //                        createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS,null, LocalDateTime.now().minusHours(2))),
430 //                  20,
431 //                  5,
432 //                  IN_PROGRESS,
433 //                  "Broker with in progress topic should pull only in-progress jobs - first in-progress job by oldest date - ignore deleted,completed, failed, pending and stopped statuses"
434 //                },
435 //                {ImmutableList.of(
436 //                        createNewJob(11, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now()),
437 //                        createNewJob(22, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(1)),
438 //                        createNewJob(33, UUID.randomUUID(), "userId", IN_PROGRESS, null, LocalDateTime.now().minusSeconds(2))),
439 //                        20,
440 //                        -1,
441 //                        IN_PROGRESS,
442 //                        "Broker with in progress topic should not pull any job if its modified date is smaller than now-interval (20 seconds)"
443 //                }
444 //
445 //        };
446 //    }
447 //
448 //
449 //    @Test(dataProvider = "jobs")
450 //    public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<JobDaoImpl> jobs, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
451 //        JobsBrokerServiceInDatabaseImpl broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20);
452 //        for (JobDaoImpl job : jobs) {
453 //            Date modifiedDate = job.getModified();
454 //            broker.add(job);
455 //            setModifiedDateToJob(job.getUuid(), modifiedDate);
456 //        }
457 //        Optional<Job> nextJob = broker.pull(topic, UUID.randomUUID().toString());
458 //        boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
459 //        Assert.assertEquals(nextJob.isPresent(), shouldAnyBeSelected, assertionReason);
460 //        if (shouldAnyBeSelected) {
461 //            Assert.assertEquals(jobs.get(expectedIndexSelected), nextJob.get(), assertionReason);
462 //        }
463 //    }
464 //
465 //    @DataProvider
466 //    public Object[][] topics() {
467 //        return Arrays.stream(Job.JobStatus.values())
468 //                .filter(not(t -> ImmutableList.of(PENDING, IN_PROGRESS).contains(t)))
469 //                .map(v -> new Object[]{v}).collect(Collectors.toList()).toArray(new Object[][]{});
470 //    }
471 //
472 //    @Test(dataProvider = "topics", expectedExceptions = GenericUncheckedException.class, expectedExceptionsMessageRegExp = "Unsupported topic.*")
473 //    public void pullUnexpectedTopic_exceptionIsThrown(Job.JobStatus topic) {
474 //        broker.pull(topic, UUID.randomUUID().toString());
475 //    }
476 //
477 //    @Test(expectedExceptions = NoJobException.class)
478 //    public void givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved() {
479 //        Stream.of(Job.JobStatus.values())
480 //                .filter(not(s -> s.equals(PENDING)))
481 //                .map(s -> createMockJob("some user id", s))
482 //                .map(job -> newJobAsync(broker, job))
483 //                .map(this::waitForFutureJob)
484 //                .collect(Collectors.toList());
485 //
486 //        waitForFutureOptionalJob(pullJobAsync(broker));
487 //    }
488 //
489 //    @Test
490 //    public void givenPendingAndNonPendingJobs_getJobAsPendingTopic_verifyAJobRetrieved() {
491 //        newJobAsync(broker); // this negated the expected result of the call below
492 //        givenNonPendingJobs_getJobAsPendingTopic_verifyNothingRetrieved();
493 //    }
494 //
495 //    @Test(expectedExceptions = NoJobException.class)
496 //    public void givenManyJobs_pullThemAllAndAskOneMore_verifyFinallyNothingRetrieved() {
497 //        putAndGetALotOfJobs(broker);
498 //        waitForFutureOptionalJob(pullJobAsync(broker));
499 //    }
500 //
501 //    @Test(expectedExceptions = NoJobException.class)
502 //    public void givenNoJob_requestJob_verifyNothingRetrieved() throws InterruptedException, ExecutionException, TimeoutException {
503 //        final Future<Optional<Job>> futureOptionalJob = pullJobAsync(broker);
504 //        assertThat("job should not be waiting yet", futureOptionalJob.get(FEW, MILLISECONDS).isPresent(), is(false));
505 //        waitForFutureOptionalJob(futureOptionalJob);
506 //    }
507 //
508 //    @Test(expectedExceptions = IllegalStateException.class)
509 //    public void givenSinglePulledJob_pushBackDifferentJob_verifyPushingRejected() {
510 //        waitForFutureJob(newJobAsync(broker));
511 //        waitForFutureJob(newJobAsync(broker));
512 //        waitForFutureOptionalJob(pullJobAsync(broker));
513 //
514 //        Job myJob = createMockJob("user id");
515 //        myJob.setUuid(UUID.randomUUID());
516 //
517 //        broker.pushBack(myJob); //Should fail
518 //    }
519 //
520 //    @Test
521 //    public void givenSingleJob_pushBackModifiedJob_verifyPulledIsVeryVeryTheSame() {
522 //        final ImmutableMap<String, Object> randomDataForMostRecentJobType =
523 //                ImmutableMap.of("42", 42, "complex", ImmutableList.of("a", "b", "c"));
524 //
525 //        waitForFutureJob(newJobAsync(broker));
526 //        final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
527 //
528 //        job.setStatus(Job.JobStatus.PENDING);
529 //        job.setTypeAndData(JobType.NoOp, ImmutableMap.of("good", "morning"));
530 //        job.setTypeAndData(JobType.HttpCall, ImmutableMap.of());
531 //        job.setTypeAndData(JobType.ServiceInstantiation, randomDataForMostRecentJobType);
532 //
533 //        broker.pushBack(job);
534 //        final Job retrievedJob = waitForFutureOptionalJob(pullJobAsync(broker));
535 //
536 //        assertThat(JOBS_SHOULD_MATCH, retrievedJob, is(job));
537 //        assertThat(JOBS_SHOULD_MATCH, retrievedJob.getData(), both(equalTo(job.getData())).and(equalTo(randomDataForMostRecentJobType)));
538 //        assertThat(JOBS_SHOULD_MATCH, jobDataReflected(retrievedJob), is(jobDataReflected(job)));
539 //    }
540 //
541 //    private static String jobDataReflected(Job job) {
542 //        return new ReflectionToStringBuilder(job, ToStringStyle.SHORT_PREFIX_STYLE)
543 //                .setExcludeFieldNames("created", "modified", "takenBy")
544 //                .toString();
545 //    }
546 //
547 //    @Test(expectedExceptions = IllegalStateException.class)
548 //    public void givenSingleJob_pushBackTwice_verifyPushingRejected() {
549 //        waitForFutureJob(newJobAsync(broker));
550 //        final Job job = waitForFutureOptionalJob(pullJobAsync(broker));
551 //
552 //        broker.pushBack(job);
553 //        broker.pushBack(job); //Should fail
554 //    }
555 //
556 //    @Test
557 //    public void addJob_PeekItById_verifySameJobWasPeeked() {
558 //        String userId = UUID.randomUUID().toString();
559 //        Job myJob = createMockJob(userId);
560 //        UUID uuid = broker.add(myJob);
561 //        Job peekedJob = broker.peek(uuid);
562 //        assertEquals("added testId is not the same as peeked TestsId",
563 //                userId,
564 //                peekedJob.getData().get("userId"));
565 //    }
566 //
567 //    @Test(dataProvider = "jobStatusesForSuccessDelete", expectedExceptions = NoJobException.class)
568 //       public void givenOneJob_deleteIt_canPeekOnItButCantPull(Job.JobStatus status) {
569 //        final Job job = waitForFutureJob(newJobAsync(broker, status));
570 //        broker.delete(job.getUuid());
571 //        assertNotNull(((JobDaoImpl) broker.peek(job.getUuid())).getDeletedAt(), "job should be deleted");
572 //        waitForFutureOptionalJob(pullJobAsync(broker));
573 //    }
574 //
575 //    @DataProvider
576 //    public static Object[][] jobStatusesForSuccessDelete() {
577 //        return new Object[][]{
578 //                {PENDING},
579 //                {STOPPED}
580 //        };
581 //    }
582 //
583 //    @Test(
584 //            dataProvider = "jobStatusesForFailedDelete",
585 //            expectedExceptions = OperationNotAllowedException.class,
586 //            expectedExceptionsMessageRegExp=DELETE_SERVICE_INFO_STATUS_EXCEPTION_MESSAGE
587 //    )
588 //    public void deleteJob_notAllowedStatus_exceptionIsThrown(Job.JobStatus status, boolean taken) {
589 //        final Job job = waitForFutureJob(newJobAsync(broker, createMockJob("some user id", status)));
590 //
591 //        if (taken) {
592 //            waitForFutureOptionalJob(pullJobAsync(broker));
593 //        }
594 //
595 //
596 //        broker.delete(job.getUuid());
597 //    }
598 //
599 //    @DataProvider
600 //    public static Object[][] jobStatusesForFailedDelete() {
601 //        return new Object[][]{
602 //                {PENDING, true},
603 //                {IN_PROGRESS, false},
604 //                {COMPLETED, false},
605 //                {PAUSE, false},
606 //                {FAILED, false},
607 //        };
608 //    }
609 //
610 //    @Test(expectedExceptions = OperationNotAllowedException.class, expectedExceptionsMessageRegExp = DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE)
611 //    public void deleteJob_notExist_exceptionIsThrown() {
612 //        waitForFutureJob(newJobAsync(broker, createMockJob("some user id", PENDING)));
613 //        broker.delete(new UUID(111, 111));
614 //    }
615 //
616 //}