package org.onap.vid.services;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.mockito.Mockito.when;
+import static org.onap.vid.job.Job.JobStatus.COMPLETED;
+import static org.onap.vid.job.Job.JobStatus.COMPLETED_WITH_ERRORS;
+import static org.onap.vid.job.Job.JobStatus.COMPLETED_WITH_NO_ACTION;
+import static org.onap.vid.job.Job.JobStatus.CREATING;
+import static org.onap.vid.job.Job.JobStatus.FAILED;
+import static org.onap.vid.job.Job.JobStatus.IN_PROGRESS;
+import static org.onap.vid.job.Job.JobStatus.PAUSE;
+import static org.onap.vid.job.Job.JobStatus.PENDING;
+import static org.onap.vid.job.Job.JobStatus.PENDING_RESOURCE;
+import static org.onap.vid.job.Job.JobStatus.RESOURCE_IN_PROGRESS;
+import static org.onap.vid.job.Job.JobStatus.STOPPED;
+import static org.onap.vid.testUtils.TestUtils.generateRandomAlphaNumeric;
+import static org.onap.vid.utils.Streams.not;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import java.lang.reflect.Method;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import javax.inject.Inject;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.onap.portalsdk.core.domain.support.DomainVo;
import org.onap.portalsdk.core.service.DataAccessService;
import org.onap.portalsdk.core.util.SystemProperties;
+import org.onap.vid.config.DataSourceConfig;
+import org.onap.vid.config.JobAdapterConfig;
import org.onap.vid.exceptions.GenericUncheckedException;
import org.onap.vid.exceptions.OperationNotAllowedException;
import org.onap.vid.job.Job;
import org.onap.vid.job.JobAdapter;
+import org.onap.vid.job.JobAdapter.AsyncJobRequest;
import org.onap.vid.job.JobType;
import org.onap.vid.job.JobsBrokerService;
import org.onap.vid.job.command.JobCommandFactoryTest;
import org.onap.vid.job.impl.JobDaoImpl;
import org.onap.vid.job.impl.JobSchedulerInitializer;
+import org.onap.vid.job.impl.JobSharedData;
import org.onap.vid.job.impl.JobsBrokerServiceInDatabaseImpl;
-import org.onap.vid.services.VersionService;
import org.onap.vid.utils.DaoUtils;
-import org.onap.vid.config.DataSourceConfig;
-import org.onap.vid.config.JobAdapterConfig;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-
-import javax.inject.Inject;
-import java.lang.reflect.Method;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.stream.Collectors.toList;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.both;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.mockito.Mockito.when;
-import static org.onap.vid.job.Job.JobStatus.*;
-import static org.onap.vid.utils.Streams.not;
-import static org.onap.vid.testUtils.TestUtils.generateRandomAlphaNumeric;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertFalse;
+import org.togglz.core.manager.FeatureManager;
@ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, JobAdapterConfig.class})
public class JobsBrokerServiceTest extends AbstractTestNGSpringContextTests {
private final Set<Long> threadsIds = new ConcurrentSkipListSet<>();
- private final long FEW = 500;
+ private final long FEW = 1000;
+ private final long SOME = 2000;
private final String JOBS_SHOULD_MATCH = "the jobs that added and those that pulled must be the same";
private final String JOBS_PEEKED_SHOULD_MATCH = "the jobs that added and those that peeked must be the same";
private static final String DELETE_SERVICE_NOT_EXIST_EXCEPTION_MESSAGE = "Service does not exist";
private JobsBrokerService broker;
+ @Mock
+ private FeatureManager featureManager;
+
@Inject
JobAdapter jobAdapter;
@Inject
public void initializeBroker() {
MockitoAnnotations.initMocks(this);
when(versionService.retrieveBuildNumber()).thenReturn("aBuildNumber");
- broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0, versionService);
+ broker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, 200, 0, versionService, featureManager);
((JobsBrokerServiceInDatabaseImpl) broker).deleteAll();
}
private Job waitForFutureOptionalJob(Future<Optional<Job>> retrievedOptionalJobFuture) {
try {
- return retrievedOptionalJobFuture.get(FEW, MILLISECONDS).orElseThrow(NoJobException::new);
+ return retrievedOptionalJobFuture.get(SOME, MILLISECONDS).orElseThrow(NoJobException::new);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
private Job waitForFutureJob(Future<Job> retrievedJobFuture) {
try {
- return retrievedJobFuture.get(FEW, MILLISECONDS);
+ return retrievedJobFuture.get(SOME, MILLISECONDS);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
@Test
- public void givenManyJobs_getThemAllThenPushBackandGet_verifySameJobs() {
+ public void givenManyJobs_getThemAllThenPushBackAndGet_verifySameJobs() {
final List<Job> retrievedJobs1 = putAndGetALotOfJobs(broker);
pushBackJobs(retrievedJobs1, broker);
@Test(dataProvider = "jobs")
public void givenSomeJobs_pullNextJob_returnNextOrNothingAsExpected(List<Jobber> jobbers, int msoLimit, int expectedIndexSelected, Job.JobStatus topic, String assertionReason) {
- JobsBrokerServiceInDatabaseImpl aBroker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20, versionService);
+ JobsBrokerServiceInDatabaseImpl aBroker = new JobsBrokerServiceInDatabaseImpl(dataAccessService, sessionFactory, msoLimit, 20, versionService, featureManager);
final List<JobDaoImpl> jobs = addJobsWithModifiedDate(jobbers, aBroker);
Optional<Job> nextJob = aBroker.pull(topic, UUID.randomUUID().toString());
boolean shouldAnyBeSelected = expectedIndexSelected >= 0;
broker.delete(new UUID(111, 111));
}
+ public static class MockAsyncRequest implements AsyncJobRequest {
+ public String value;
+
+ public MockAsyncRequest() {}
+
+ public MockAsyncRequest(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+
+ @Test
+ public void twoJobsWithSamePosition_bothJobsArePulled(){
+ UUID uuid = UUID.randomUUID();
+ int positionInBulk = RandomUtils.nextInt();
+ String userId = "userId";
+
+ Optional<Job> firstPulledJob = createAddAndPullJob(uuid, positionInBulk, userId, "first value");
+ Optional<Job> secondPulledJob = createAddAndPullJob(uuid, positionInBulk, userId, "second value");
+
+ MockAsyncRequest firstValue = (MockAsyncRequest) firstPulledJob.get().getSharedData().getRequest();
+ MockAsyncRequest secondValue = (MockAsyncRequest) secondPulledJob.get().getSharedData().getRequest();
+ assertThat(ImmutableList.of(firstValue.value, secondValue.value),
+ containsInAnyOrder("first value", "second value"));
+ }
+
+ private Optional<Job> createAddAndPullJob(UUID uuid, int positionInBulk, String userId, String s) {
+ JobDaoImpl job1 = createNewJob(positionInBulk, uuid, userId, CREATING, null,
+ LocalDateTime.now().minusSeconds(1), false);
+ job1.setSharedData(new JobSharedData(null, userId, new MockAsyncRequest(s), "testApi"));
+ broker.add(job1);
+ return broker.pull(CREATING, userId);
+ }
}