1 package org.onap.vid.job.impl;
3 import com.google.common.collect.ImmutableList;
4 import com.google.common.collect.ImmutableMap;
5 import org.apache.commons.lang3.StringUtils;
6 import org.mockito.Mockito;
7 import org.onap.portalsdk.core.service.DataAccessService;
8 import org.onap.portalsdk.core.util.SystemProperties;
9 import org.onap.vid.asdc.AsdcCatalogException;
10 import org.onap.vid.config.DataSourceConfig;
11 import org.onap.vid.config.JobCommandsConfigWithMockedMso;
12 import org.onap.vid.config.MockedAaiClientAndFeatureManagerConfig;
13 import org.onap.vid.job.Job;
14 import org.onap.vid.job.Job.JobStatus;
15 import org.onap.vid.job.JobType;
16 import org.onap.vid.job.JobsBrokerService;
17 import org.onap.vid.job.command.CommandUtils;
18 import org.onap.vid.job.command.InternalState;
19 import org.onap.vid.model.Action;
20 import org.onap.vid.model.NameCounter;
21 import org.onap.vid.model.RequestReferencesContainer;
22 import org.onap.vid.model.ServiceInfo;
23 import org.onap.vid.model.serviceInstantiation.ServiceInstantiation;
24 import org.onap.vid.mso.RestMsoImplementation;
25 import org.onap.vid.mso.RestObject;
26 import org.onap.vid.mso.model.RequestReferences;
27 import org.onap.vid.mso.rest.AsyncRequestStatus;
28 import org.onap.vid.properties.Features;
29 import org.onap.vid.services.AsyncInstantiationBaseTest;
30 import org.onap.vid.services.AsyncInstantiationBusinessLogic;
31 import org.onap.vid.utils.DaoUtils;
32 import org.springframework.test.context.ContextConfiguration;
33 import org.testng.annotations.BeforeClass;
34 import org.testng.annotations.BeforeMethod;
35 import org.testng.annotations.DataProvider;
36 import org.togglz.core.manager.FeatureManager;
38 import javax.inject.Inject;
39 import javax.ws.rs.ProcessingException;
40 import java.lang.reflect.Method;
42 import java.util.function.BiConsumer;
43 import java.util.function.Supplier;
44 import java.util.stream.Collectors;
45 import java.util.stream.IntStream;
46 import java.util.stream.Stream;
48 import static java.util.stream.Collectors.*;
49 import static net.javacrumbs.jsonunit.JsonMatchers.jsonPartEquals;
50 import static org.hamcrest.CoreMatchers.*;
51 import static org.hamcrest.MatcherAssert.assertThat;
52 import static org.hamcrest.Matchers.hasProperty;
53 import static org.hamcrest.Matchers.hasSize;
54 import static org.hamcrest.core.Every.everyItem;
55 import static org.mockito.ArgumentMatchers.any;
56 import static org.mockito.ArgumentMatchers.endsWith;
57 import static org.mockito.ArgumentMatchers.*;
58 import static org.mockito.Mockito.*;
59 import static org.onap.vid.job.Job.JobStatus.*;
60 import static org.onap.vid.model.JobAuditStatus.SourceStatus.MSO;
61 import static org.onap.vid.model.JobAuditStatus.SourceStatus.VID;
62 import static org.testng.AssertJUnit.*;
64 //it's more like integration test than UT
65 //But it's very hard to test in API test so I use UT
66 @ContextConfiguration(classes = {DataSourceConfig.class, SystemProperties.class, MockedAaiClientAndFeatureManagerConfig.class, JobCommandsConfigWithMockedMso.class})
67 public class AsyncInstantiationIntegrationTest extends AsyncInstantiationBaseTest {
69 private static final String FAILED_STR = "FAILED";
70 private static final String COMPLETE_STR = "COMPLETE";
71 private static final String IN_PROGRESS_STR = "IN_PROGRESS";
72 private static final String REQUESTED = "REQUESTED";
73 private static final String PENDING_MANUAL_TASK = "PENDING_MANUAL_TASK";
74 public static final String RAW_DATA_FROM_MSO = "RAW DATA FROM MSO";
75 private static String USER_ID = "123";
76 public static String REQUEST_ID = UUID.randomUUID().toString();
77 public static String SERVICE_INSTANCE_ID = UUID.randomUUID().toString();
80 private JobsBrokerService jobsBrokerService;
83 private JobWorker jobWorker;
86 private FeatureManager featureManager;
89 private AsyncInstantiationBusinessLogic asyncInstantiationBL;
92 private RestMsoImplementation restMso;
95 private DataAccessService dataAccessService;
98 private CommandUtils commandUtils;
101 void initServicesInfoService() {
102 createInstanceParamsMaps();
107 dataAccessService.deleteDomainObjects(ServiceInfo.class, "1=1", DaoUtils.getPropsMap());
108 dataAccessService.deleteDomainObjects(JobDaoImpl.class, "1=1", DaoUtils.getPropsMap());
109 dataAccessService.deleteDomainObjects(NameCounter.class, "1=1", DaoUtils.getPropsMap());
114 mockAaiClientAnyNameFree();
118 public void whenPushNewBulk_thenAllServicesAreInPending() {
121 List<ServiceInfo> serviceInfoList = asyncInstantiationBL.getAllServicesInfo();
122 assertThat( serviceInfoList, everyItem(hasProperty("jobStatus", is(PENDING))));
125 private List<UUID> pushMacroBulk() {
126 ServiceInstantiation serviceInstantiation = generateMockMacroServiceInstantiationPayload(false,
127 createVnfList(instanceParamsMapWithoutParams, Collections.EMPTY_LIST, true),
128 3, true,PROJECT_NAME, true);
129 return asyncInstantiationBL.pushBulkJob(serviceInstantiation, USER_ID);
132 private UUID pushALaCarteWithVnf() {
133 ServiceInstantiation serviceInstantiation = generateALaCarteWithVnfsServiceInstantiationPayload();
134 List<UUID> uuids = asyncInstantiationBL.pushBulkJob(serviceInstantiation, USER_ID);
135 assertThat(uuids, hasSize(1));
139 private UUID pushALaCarteUpdateWithGroups() {
140 ServiceInstantiation serviceInstantiation = generateALaCarteUpdateWith1ExistingGroup2NewGroupsPayload();
141 List<UUID> uuids = asyncInstantiationBL.pushBulkJob(serviceInstantiation, USER_ID);
142 assertThat(uuids, hasSize(1));
146 public static RestObject<RequestReferencesContainer> createResponse(int statusCode) {
147 return createResponse(statusCode, SERVICE_INSTANCE_ID, REQUEST_ID);
150 public static RestObject<RequestReferencesContainer> createResponse(int statusCode, String instanceId, String requestId) {
151 RequestReferences requestReferences = new RequestReferences();
152 requestReferences.setRequestId(requestId);
153 requestReferences.setInstanceId(instanceId);
154 RestObject<RequestReferencesContainer> restObject = new RestObject<>();
155 restObject.set(new RequestReferencesContainer(requestReferences));
156 restObject.setStatusCode(statusCode);
157 restObject.setRaw(RAW_DATA_FROM_MSO);
161 ImmutableList<String> statusesToStrings(JobStatus... jobStatuses) {
162 return Stream.of(jobStatuses).map(
163 Enum::toString).collect(ImmutableList.toImmutableList());
167 Make sure service state is in progress once request has sent to MSO
168 Make sure service state is in progress once request has sent to MSO and MSO status is in_progress
169 Make sure service state is Failed once we got from MSO failure state, and that job's are not collected any more.
170 Make sure service state is Completed successfully once we got from MSO complete, and that next job is peeked.
171 Once a service in the bulk is failed, other services moved to Stopped, and no other jobs from the bulk are peeked.
174 public void testStatusesOfMacroServiceInBulkDuringBulkLifeCycle() {
175 when(restMso.PostForObject(any(), any(), eq(RequestReferencesContainer.class))).thenReturn(createResponse(200));
176 ImmutableList<ImmutableList<String>> expectedStatusesForVid = ImmutableList.of(
177 statusesToStrings(PENDING, IN_PROGRESS, COMPLETED),
178 statusesToStrings(PENDING, IN_PROGRESS, FAILED),
179 statusesToStrings(PENDING, STOPPED)
182 ImmutableList<ImmutableList<String>> expectedStatusesForMso = ImmutableList.of(
183 ImmutableList.of(REQUESTED, IN_PROGRESS_STR, "not a state", FAILED_STR ,COMPLETE_STR),
184 ImmutableList.of(REQUESTED, FAILED_STR),
188 List<UUID> uuids = pushMacroBulk();
189 pullPendingJobAndAssertJobStatus(JobStatus.IN_PROGRESS, PENDING);
191 //assert that when get ProcessingException from restMso, status remain the same
192 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).thenThrow(new ProcessingException("fake message"));
193 Job job = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS);
194 UUID firstHandledJobUUID = job.getUuid();
195 listServicesAndAssertStatus(JobStatus.IN_PROGRESS, PENDING, job);
197 //assert that when get IN_PROGRESS status from restMso, status remain IN_PROGRESS
198 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).thenReturn(asyncRequestStatusResponseAsRestObject(IN_PROGRESS_STR));
199 job = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS);
200 listServicesAndAssertStatus(JobStatus.IN_PROGRESS, PENDING, job);
202 //assert that when get unrecognized status from restMso, status remain IN_PROGRESS
203 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).thenReturn(asyncRequestStatusResponseAsRestObject("not a state"));
204 job = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS);
205 listServicesAndAssertStatus(JobStatus.IN_PROGRESS, PENDING, job);
207 //assert that when get non 200 status code during IN_PROGRESS, status remain IN_PROGRESS
208 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).thenReturn(asyncRequestStatusResponseAsRestObject(COMPLETE_STR, 404));
209 job = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS);
210 listServicesAndAssertStatus(JobStatus.IN_PROGRESS, PENDING, job);
212 //when get job COMPLETE from MSO, service status become COMPLETED
213 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).
214 thenReturn(asyncRequestStatusResponseAsRestObject(COMPLETE_STR));
215 job = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, COMPLETED);
216 List<ServiceInfo> serviceInfoList = listServicesAndAssertStatus(COMPLETED, PENDING, job);
219 //for use later in the test
220 Map<UUID, JobStatus> expectedJobStatusMap = serviceInfoList.stream().collect(
221 Collectors.toMap(ServiceInfo::getJobId, x-> PENDING));
222 expectedJobStatusMap.put(job.getUuid(), COMPLETED);
224 //when handling another PENDING job, statuses are : COMPLETED, IN_PROGRESS, PENDING
225 job = pullJobProcessAndPushBack(PENDING, JobStatus.IN_PROGRESS);
226 assertThat(job.getUuid(), not(equalTo(firstHandledJobUUID))); //assert different job was handled now
227 expectedJobStatusMap.put(job.getUuid(), JobStatus.IN_PROGRESS);
228 listServicesAndAssertStatus(expectedJobStatusMap);
230 //when get FAILED status from MSO statuses are : COMPLETED, FAILED, STOPPED
231 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).
232 thenReturn(asyncRequestStatusResponseAsRestObject(FAILED_STR));
233 job = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, JobStatus.FAILED);
234 expectedJobStatusMap.put(job.getUuid(), JobStatus.FAILED);
235 expectedJobStatusMap = expectedJobStatusMap.entrySet().stream().collect(Collectors.toMap(
236 e -> e.getKey(), e -> e.getValue() == PENDING ? JobStatus.STOPPED : e.getValue()
239 listServicesAndAssertStatus(expectedJobStatusMap);
240 IntStream.range(0, uuids.size()).forEach(i -> {
241 UUID uuid = uuids.get(i);
242 List<String> msoStatuses = asyncInstantiationBL.getAuditStatuses(uuid, MSO).stream().map(x -> x.getJobStatus()).collect(Collectors.toList());
243 List<String> vidStatuses = asyncInstantiationBL.getAuditStatuses(uuid, VID).stream().map(x -> x.getJobStatus()).collect(Collectors.toList());
244 assertThat(msoStatuses, is(expectedStatusesForMso.get(i)));
245 assertThat(vidStatuses, is(expectedStatusesForVid.get(i)));
248 assertFalse(jobsBrokerService.pull(PENDING, randomUuid()).isPresent());
249 assertFalse(jobsBrokerService.pull(JobStatus.IN_PROGRESS, randomUuid()).isPresent());
254 public static Object[][] AlaCarteStatuses(Method test) {
255 return new Object[][]{
256 {COMPLETE_STR, JobStatus.COMPLETED, JobStatus.COMPLETED},
257 {FAILED_STR, JobStatus.COMPLETED_WITH_ERRORS, JobStatus.FAILED},
262 Make sure service state is in progress once request has sent to MSO
263 Make sure service state is watching until state changes to complemented
264 Make sure service state is watching until vnf state changes to completed
265 Make sure service state is Completed successfully once we got from MSO complete for the vnf job.
268 //@Test(dataProvider = "AlaCarteStatuses")
269 public void testStatusesOfServiceDuringALaCarteLifeCycleIgnoringVfModules(String msoVnfStatus, JobStatus expectedServiceStatus, JobStatus expectedVnfStatus) {
271 [v] + push alacarte with 1 vnf
272 [v] verify STATUS pending
273 [v] + pull+execute (should post to MSO)
274 [v] verify STATUS in progress
275 [v] + pull+execute (should GET completed from MSO)
276 [v] verify STATUS in progress; TYPE watching
277 [v] verify job#2 *new* VNF job STATUS creating
278 [v] + pull+execute job#2 (should post to MSO)
279 [v] verify job#2 STATUS resource in progress
280 [v] verify job#1 STATUS in progress
281 [v] + pull+execute job#2 (should GET completed from MSO)
282 [v] verify job#2 STATUS completed
283 [v] + pull+execute job#1
284 [v] verify job#1 STATUS completed
286 * not looking on audit (yet)
288 when(featureManager.isActive(Features.FLAG_ASYNC_ALACARTE_VNF)).thenReturn(true);
289 when(featureManager.isActive(Features.FLAG_ASYNC_ALACARTE_VFMODULE)).thenReturn(false);
290 final String SERVICE_REQUEST_ID = UUID.randomUUID().toString();
291 final String SERVICE_INSTANCE_ID = UUID.randomUUID().toString();
292 final String VNF_REQUEST_ID = UUID.randomUUID().toString();
295 //push alacarte with 1 vnf, verify STATUS pending
296 UUID uuid = pushALaCarteWithVnf();
297 singleServicesAndAssertStatus(JobStatus.PENDING, uuid);
299 //mock mso to answer 200 of create service instance request, verify STATUS in progress
300 when(restMso.PostForObject(any(), endsWith("serviceInstances"), eq(RequestReferencesContainer.class))).thenReturn(
301 createResponse(200, SERVICE_INSTANCE_ID, SERVICE_REQUEST_ID));
302 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.PENDING, JobStatus.IN_PROGRESS, JobType.InProgressStatus);
303 singleServicesAndAssertStatus(JobStatus.IN_PROGRESS, uuid);
305 //mock mso to answer COMPLETE for service instance create, job status shall remain IN_PROGRESS and type shall be Watching
307 when(restMso.GetForObject(endsWith(SERVICE_REQUEST_ID), eq(AsyncRequestStatus.class))).
308 thenReturn(asyncRequestStatusResponseAsRestObject(COMPLETE_STR));
309 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS, JobType.Watching);
310 singleServicesAndAssertStatus(JobStatus.IN_PROGRESS, uuid);
312 //mock mso to answer 200 of create vnf instance request, pull+execute vnf job, STATUS resource in progress
314 when(restMso.PostForObject(any(), endsWith(SERVICE_INSTANCE_ID + "/vnfs"), eq(RequestReferencesContainer.class))).thenReturn(
315 createResponse(200, UUID.randomUUID().toString(), VNF_REQUEST_ID));
316 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.CREATING, JobStatus.RESOURCE_IN_PROGRESS, JobType.VnfInProgressStatus);
318 //verify service job STATUS in progress
319 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS, JobType.Watching);
320 singleServicesAndAssertStatus(JobStatus.IN_PROGRESS, uuid);
322 //mock mso to answer msoVnfStatus (COMPLETE/FAILED) for vnf creation status,
323 //job status shall be final (COMPLETE/COMPLETE_WITH_ERRORS)
325 when(restMso.GetForObject(endsWith(VNF_REQUEST_ID), eq(AsyncRequestStatus.class))).thenReturn(
326 asyncRequestStatusResponseAsRestObject(msoVnfStatus));
327 pullJobProcessAndPushBack(JobStatus.RESOURCE_IN_PROGRESS, expectedVnfStatus, false);
328 singleServicesAndAssertStatus(JobStatus.IN_PROGRESS, uuid);
329 pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, expectedServiceStatus, true);
330 singleServicesAndAssertStatus(expectedServiceStatus, uuid);
335 this test is almost duplication of testStatusesOfServiceDuringALaCarteLifeCycleIgnoringVfModules.
337 IgnoringVfModules test check the scenario while FLAG_ASYNC_ALACARTE_VFMODULE is off
338 WithVfModules test check the scenario while FLAG_ASYNC_ALACARTE_VFMODULE is on
340 We shall consider later to remove testStatusesOfServiceDuringALaCarteLifeCycleIgnoringVfModules
341 And union these tests to single one.
345 public void testALaCarteLifeCycle1Vnf2VfModules() {
348 String msoVnfStatus = COMPLETE_STR;
349 JobStatus expectedServiceStatus = IN_PROGRESS;
350 JobStatus expectedVnfStatus = RESOURCE_IN_PROGRESS;
351 when(featureManager.isActive(Features.FLAG_ASYNC_ALACARTE_VNF)).thenReturn(true);
352 when(featureManager.isActive(Features.FLAG_ASYNC_ALACARTE_VFMODULE)).thenReturn(true);
353 final String SERVICE_REQUEST_ID = UUID.randomUUID().toString();
354 final String SERVICE_INSTANCE_ID = UUID.randomUUID().toString();
355 final String VNF_REQUEST_ID = UUID.randomUUID().toString();
356 final String VNF_INSTANCE_ID = UUID.randomUUID().toString();
357 final String VG_REQUEST_ID = UUID.randomUUID().toString();
358 final String VG_INSTANCE_ID = UUID.randomUUID().toString();
359 final String VF_MODULE_REQUEST_ID = UUID.randomUUID().toString();
360 final String VF_MODULE_REQUEST_ID2 = UUID.randomUUID().toString();
363 //push alacarte with 1 vnf, verify STATUS pending
364 UUID uuid = pushALaCarteWithVnf();
365 singleServicesAndAssertStatus(JobStatus.PENDING, uuid);
367 /*---------- service -----------*/
369 //mock mso to answer 200 of create service instance request, verify STATUS in progress
370 when(restMso.PostForObject(any(), endsWith("serviceInstances"), eq(RequestReferencesContainer.class))).thenReturn(
371 createResponse(200, SERVICE_INSTANCE_ID, SERVICE_REQUEST_ID));
372 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.PENDING, JobStatus.IN_PROGRESS, JobType.InProgressStatus);
373 singleServicesAndAssertStatus(JobStatus.IN_PROGRESS, uuid);
375 //mock mso to answer COMPLETE for service instance create, job status shall remain IN_PROGRESS and type shall be Watching
377 when(restMso.GetForObject(endsWith(SERVICE_REQUEST_ID), eq(AsyncRequestStatus.class))).
378 thenReturn(asyncRequestStatusResponseAsRestObject(COMPLETE_STR));
379 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS, JobType.Watching);
380 singleServicesAndAssertStatus(JobStatus.IN_PROGRESS, uuid);
382 /*---------- vnf -----------*/
384 //mock mso to answer 200 of create vnf instance request, pull+execute vnf job, STATUS resource in progress
386 when(restMso.PostForObject(any(), endsWith(SERVICE_INSTANCE_ID + "/vnfs"), eq(RequestReferencesContainer.class))).thenReturn(
387 createResponse(200, VNF_INSTANCE_ID, VNF_REQUEST_ID));
388 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.CREATING, JobStatus.RESOURCE_IN_PROGRESS, JobType.VnfInProgressStatus);
390 //verify service job STATUS in progress
391 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS, JobType.Watching);
392 singleServicesAndAssertStatus(JobStatus.IN_PROGRESS, uuid);
394 //mock mso to answer msoVnfStatus (COMPLETE/FAILED) for vnf creation status,
395 //job status shall be final (COMPLETE/COMPLETE_WITH_ERRORS)
397 when(restMso.GetForObject(endsWith(VNF_REQUEST_ID), eq(AsyncRequestStatus.class))).
398 thenReturn(asyncRequestStatusResponseAsRestObject(COMPLETE_STR));
402 when(commandUtils.isVfModuleBaseModule(SERVICE_MODEL_VERSION_ID, VF_MODULE_0_MODEL_VERSION_ID)).thenReturn(true);
403 when(commandUtils.isVfModuleBaseModule(SERVICE_MODEL_VERSION_ID, VF_MODULE_1_MODEL_VERSION_ID)).thenReturn(false);
404 } catch (AsdcCatalogException e) {
408 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.RESOURCE_IN_PROGRESS, JobStatus.RESOURCE_IN_PROGRESS, JobType.WatchingBaseModule);
409 singleServicesAndAssertStatus(JobStatus.IN_PROGRESS, uuid);
410 pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS, true);
411 singleServicesAndAssertStatus(JobStatus.IN_PROGRESS, uuid);
413 /*---------- vf Module without volume group name (base) -----------*/
415 //vg name not exist, so vf module created immediately
416 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.CREATING, JobStatus.RESOURCE_IN_PROGRESS, JobType.Watching);
418 //verify vnf/volumeGroup job STATUS still watching with resource in progress
419 pullMultipleJobsFindExpectedProcessAndPushBack(JobStatus.RESOURCE_IN_PROGRESS, JobType.Watching, JobStatus.RESOURCE_IN_PROGRESS, JobType.Watching);
421 //mock mso to answer 200 of create vfModule instance request, pull+execute volumeGroup job, STATUS resource in progress
423 when(restMso.PostForObject(any(), endsWith(SERVICE_INSTANCE_ID + "/vnfs/" + VNF_INSTANCE_ID + "/vfModules"), eq(RequestReferencesContainer.class))).thenReturn(
424 createResponse(200, UUID.randomUUID().toString(), VF_MODULE_REQUEST_ID));
425 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.CREATING, JobStatus.RESOURCE_IN_PROGRESS, JobType.ResourceInProgressStatus);
427 //mock mso to answer for vf module orchestration request
429 when(restMso.GetForObject(endsWith(VF_MODULE_REQUEST_ID), eq(AsyncRequestStatus.class))).thenReturn(
430 asyncRequestStatusResponseAsRestObject(COMPLETE_STR));
431 pullMultipleJobsFindExpectedProcessAndPushBack(JobStatus.RESOURCE_IN_PROGRESS, JobType.ResourceInProgressStatus, JobStatus.COMPLETED, JobType.ResourceInProgressStatus);
433 //verify volume group become completed
434 pullMultipleJobsFindExpectedProcessAndPushBack(JobStatus.RESOURCE_IN_PROGRESS, JobType.Watching, JobStatus.COMPLETED, JobType.Watching);
436 //vnf become watching after volume group completed, and new volume group created
437 pullMultipleJobsFindExpectedProcessAndPushBack(JobStatus.RESOURCE_IN_PROGRESS, JobType.WatchingBaseModule, JobStatus.RESOURCE_IN_PROGRESS, JobType.Watching);
439 /*---------- volume group & vf module (non base) -----------*/
441 /*---------- volume group -----------*/
443 //mock mso to answer 200 of create volumeGroup instance request, pull+execute volumeGroup job, STATUS resource in progress
445 when(restMso.PostForObject(any(), endsWith(SERVICE_INSTANCE_ID + "/vnfs/" + VNF_INSTANCE_ID + "/volumeGroups"), eq(RequestReferencesContainer.class))).thenReturn(
446 createResponse(200, VG_INSTANCE_ID, VG_REQUEST_ID));
447 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.CREATING, JobStatus.RESOURCE_IN_PROGRESS, JobType.VolumeGroupInProgressStatus);
449 //verify vnf job STATUS still watching with resource in progress
450 pullMultipleJobsFindExpectedProcessAndPushBack(JobStatus.RESOURCE_IN_PROGRESS, JobType.Watching, JobStatus.RESOURCE_IN_PROGRESS, JobType.Watching);
452 //mock mso to answer for volume group orchestration request
454 when(restMso.GetForObject(endsWith(VG_REQUEST_ID), eq(AsyncRequestStatus.class))).thenReturn(
455 asyncRequestStatusResponseAsRestObject(msoVnfStatus));
456 pullMultipleJobsFindExpectedProcessAndPushBack(JobStatus.RESOURCE_IN_PROGRESS, JobType.VolumeGroupInProgressStatus, JobStatus.RESOURCE_IN_PROGRESS, JobType.Watching);
458 /*---------- vfModule -----------*/
460 //mock mso to answer 200 of create vfModule instance request, pull+execute volumeGroup job, STATUS resource in progress
462 when(restMso.PostForObject(any(), endsWith(SERVICE_INSTANCE_ID + "/vnfs/" + VNF_INSTANCE_ID + "/vfModules"), eq(RequestReferencesContainer.class))).thenReturn(
463 createResponse(200, UUID.randomUUID().toString(), VF_MODULE_REQUEST_ID2));
464 pullJobProcessAndPushBackWithTypeAssertion(JobStatus.CREATING, JobStatus.RESOURCE_IN_PROGRESS, JobType.ResourceInProgressStatus);
466 //mock mso to answer for vf module orchestration request
468 when(restMso.GetForObject(endsWith(VF_MODULE_REQUEST_ID2), eq(AsyncRequestStatus.class))).thenReturn(
469 asyncRequestStatusResponseAsRestObject(COMPLETE_STR));
470 pullMultipleJobsFindExpectedProcessAndPushBack(JobStatus.RESOURCE_IN_PROGRESS, JobType.ResourceInProgressStatus, JobStatus.COMPLETED, JobType.ResourceInProgressStatus);
472 //execute twice - 1 for parent volume group, 1 for parent vnf
473 pullAllJobProcessAndPushBackByType(JobStatus.RESOURCE_IN_PROGRESS, JobType.Watching , JobStatus.COMPLETED);
475 singleServicesAndAssertStatus(JobStatus.IN_PROGRESS, uuid);
476 pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, JobStatus.COMPLETED, true);
477 singleServicesAndAssertStatus(JobStatus.COMPLETED, uuid);
481 public void testBadAaiResponseForSearchNamesAndBackToNormal() {
482 when(aaiClient.isNodeTypeExistsByName(any(), any())).thenThrow(aaiNodeQueryBadResponseException());
483 pushMacroBulk(); //JOB shall become IN_PROGRESS but service info is still pending
484 Job job = pullJobProcessAndPushBack(PENDING, JobStatus.IN_PROGRESS, true);
485 listServicesAndAssertStatus(PENDING, PENDING, job);
487 //JOB shall remain in IN_PROGRESS
488 job = pullJobProcessAndPushBack( JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS, true);
489 //make sure the job command is still ServiceInstantiation
490 assertThat(job.getType(), is(JobType.MacroServiceInstantiation));
491 listServicesAndAssertStatus(PENDING, PENDING, job);
493 //simulate AAI back to normal, AAI return name is free, and MSO return good response
494 Mockito.reset(aaiClient); // must forget the "thenThrow"
495 when(aaiClient.isNodeTypeExistsByName(any(), any())).thenReturn(false);
496 when(restMso.PostForObject(any(),any(), eq(RequestReferencesContainer.class))).thenReturn(createResponse(200));
497 job = pullJobProcessAndPushBack( JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS, true);
498 listServicesAndAssertStatus(JobStatus.IN_PROGRESS, PENDING, job);
500 //when get job COMPLETE from MSO, service status become COMPLETED
501 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).
502 thenReturn(asyncRequestStatusResponseAsRestObject(COMPLETE_STR));
503 job = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, COMPLETED);
504 listServicesAndAssertStatus(COMPLETED, PENDING, job);
508 public void testAaiResponseNameUsedTillMaxRetries() {
509 when(aaiClient.isNodeTypeExistsByName(any(), any())).thenReturn(true);
510 asyncInstantiationBL.setMaxRetriesGettingFreeNameFromAai(10);
512 //JOB shall become IN_PROGRESS but service info is still pending
513 Job job = pullJobProcessAndPushBack(PENDING, JobStatus.FAILED, true);
514 listServicesAndAssertStatus(JobStatus.FAILED, JobStatus.STOPPED, job);
517 private Job pullJobProcessAndPushBack(JobStatus topic, JobStatus expectedNextJobStatus) {
518 return pullJobProcessAndPushBack(topic, expectedNextJobStatus, true);
521 //return the pulled job (and not the pushed job)
522 private Job pullJobProcessAndPushBack(JobStatus topic, JobStatus expectedNextJobStatus, boolean pullingAssertion) {
523 Optional<Job> job = pullJob(topic, pullingAssertion);
525 Job nextJob = jobWorker.executeJobAndGetNext(job.get());
528 assertThat("next job not ok: " + nextJob.getData(), nextJob.getStatus(), is(expectedNextJobStatus));
530 if (pullingAssertion) {
531 //assert another pulling on same topic return no result (before push back)
532 assertFalse(jobsBrokerService.pull(topic, randomUuid()).isPresent());
536 jobsBrokerService.pushBack(nextJob); // push back to let retries - even if any assertion failure
538 assertThat(jobsBrokerService.peek(job.get().getUuid()).getStatus(), is(expectedNextJobStatus));
542 private Job pullJobProcessAndPushBackWithTypeAssertion(JobStatus topic, JobStatus expectedNextJobStatus,
543 JobType expectedNextJobType) {
544 Job job = pullJobProcessAndPushBack(topic, expectedNextJobStatus, false);
545 assertThat("job not ok: " + job.getData(), job.getType(), is(expectedNextJobType));
549 private Job pullJobProcessAndPushBackWithTypeAssertion(JobStatus topic, JobStatus expectedNextJobStatus,
550 JobType expectedNextJobType, int retries) {
551 return retryWithAssertionsLimit(retries, () -> {
552 return pullJobProcessAndPushBackWithTypeAssertion(topic, expectedNextJobStatus, expectedNextJobType);
556 private Job pullJobProcessAndPushBackWithTypeAssertion(JobStatus topic, JobStatus expectedNextJobStatus,
557 JobType expectedNextJobType, Action actionPhase, InternalState internalState, int retries) {
558 return retryWithAssertionsLimit(retries, () -> {
559 Job job = pullJobProcessAndPushBackWithTypeAssertion(topic, expectedNextJobStatus, expectedNextJobType);
560 assertThat("job not ok: " + job.getData(), job.getData(), is(jsonPartEquals("actionPhase", actionPhase.name())));
561 if (internalState != null) {
562 assertThat("job not ok: " + job.getData(), job.getData(), is(jsonPartEquals("internalState", internalState.name())));
568 private Job retryWithAssertionsLimit(int retries, Supplier<Job> supplier) {
569 java.util.Stack<AssertionError> history = new Stack<>();
573 return supplier.get();
574 } catch (AssertionError assertionError) {
575 history.push(assertionError);
577 } while (history.size() < retries);
580 throw new AssertionError("No luck while all of these assertion errors: " + history.stream()
581 .map(Throwable::getMessage)
582 .map(s -> s.replace('\n', ' '))
583 .map(s -> s.replaceAll("\\s{2,}"," "))
585 .collect(joining("\n ", "\n ", "")), history.peek());
588 private Job pullMultipleJobsFindExpectedProcessAndPushBack(JobStatus topic, JobType expectedCurrentJobType, JobStatus expectedNextJobStatus,
589 JobType expectedNextJobType) {
590 List<Job> pulledJobs = new ArrayList<>();
592 while (lastJob == null || lastJob.getType() != expectedCurrentJobType) {
593 lastJob = pullJob(topic, false).get();
594 if (lastJob.getType() != expectedCurrentJobType) {
595 pulledJobs.add(lastJob);
599 Job nextJob = jobWorker.executeJobAndGetNext(lastJob);
600 assertThat(nextJob.getStatus(), is(expectedNextJobStatus));
601 assertThat(nextJob.getType(), is(expectedNextJobType));
603 jobsBrokerService.pushBack(nextJob);
604 assertThat(jobsBrokerService.peek(nextJob.getUuid()).getStatus(), is(expectedNextJobStatus));
606 pulledJobs.forEach(job ->
607 jobsBrokerService.pushBack(job)
613 private void pullAllJobProcessAndPushBackByType(JobStatus topic, JobType commandType, JobStatus expectedFinalStatus) {
614 Map<UUID, JobStatus> jobStatusMap = new HashMap<>();
615 Optional<Job> job = pullJob(topic, false);
616 for (int i=0; i<1000 && job.isPresent() && job.get().getType() == commandType; i++) {
617 Job nextJob = jobWorker.executeJobAndGetNext(job.get());
618 jobStatusMap.put(nextJob.getUuid(), nextJob.getStatus());
619 jobsBrokerService.pushBack(nextJob);
620 job = jobsBrokerService.pull(topic, UUID.randomUUID().toString());
622 assertThat(jobStatusMap.values(), everyItem(is(expectedFinalStatus)));
626 private Optional<Job> pullJob(JobStatus topic, boolean pullingAssertion) {
627 if (pullingAssertion) {
628 //assert pulling on inverse topic return no result
629 assertFalse(jobsBrokerService.pull(inverseTopic(topic), randomUuid()).isPresent());
632 Optional<Job> job = jobsBrokerService.pull(topic, randomUuid());
633 assertTrue("no job fetched", job.isPresent());
635 if (pullingAssertion) {
636 //assert another pulling on same topic return no result
637 assertFalse(jobsBrokerService.pull(topic, randomUuid()).isPresent());
643 private JobStatus inverseTopic(JobStatus topic) {
644 return topic==JobStatus.IN_PROGRESS ? PENDING : JobStatus.IN_PROGRESS;
649 public void whenPushNewBulk_andGetNoResponseFromMsoOnCreation_thenServiceMoveToFailedAndOtherToStopped() {
650 when(restMso.PostForObject(any(), any(), eq(RequestReferencesContainer.class))).thenReturn(createResponse(500));
651 pushBulkPullPendingJobAndAssertJobStatus(JobStatus.FAILED, JobStatus.STOPPED);
655 public void whenMsoStatusIsPendingManualTask_ThenJobStatusIsPaused() {
656 when(restMso.PostForObject(any(), any(), eq(RequestReferencesContainer.class))).thenReturn(createResponse(200));
658 Job firstJob = pushBulkPullPendingJobAndAssertJobStatus(JobStatus.IN_PROGRESS, PENDING);
660 //assert that when get ProcessingException from restMso, status remain the same
661 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).
662 thenReturn(asyncRequestStatusResponseAsRestObject(PENDING_MANUAL_TASK));
663 Job job = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS);
664 listServicesAndAssertStatus(PAUSE, PENDING, job);
666 //The paused job is pulled and remain in pause state. Other jobs from bulk remain pending
667 job = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS);
668 listServicesAndAssertStatus(PAUSE, PENDING, job);
670 //the job get IN_PROGRESS response (simulate activate operation) and status changed to IN_PROGRESS
671 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).
672 thenReturn(asyncRequestStatusResponseAsRestObject(IN_PROGRESS_STR));
673 job = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, JobStatus.IN_PROGRESS);
674 listServicesAndAssertStatus(JobStatus.IN_PROGRESS, PENDING, job);
676 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).
677 thenReturn(asyncRequestStatusResponseAsRestObject(COMPLETE_STR));
678 job = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, COMPLETED);
679 listServicesAndAssertStatus(COMPLETED, PENDING, job);
681 //Pulling PENDING job return another job
682 assertThat(jobsBrokerService.pull(PENDING, randomUuid()).get().getUuid(), not(equalTo(job.getUuid())));
685 ImmutableList<String> expectedStatusesForMso = ImmutableList.of(REQUESTED, PENDING_MANUAL_TASK, IN_PROGRESS_STR, COMPLETE_STR);
686 List<String> msoStatuses = asyncInstantiationBL.getAuditStatuses(firstJob.getUuid(), MSO).stream().map(x -> x.getJobStatus()).collect(Collectors.toList());
687 assertThat(msoStatuses, is(expectedStatusesForMso));
689 ImmutableList<String> expectedStatusesForVid = statusesToStrings(PENDING, IN_PROGRESS, PAUSE, IN_PROGRESS, COMPLETED);
690 List<String> vidStatuses = asyncInstantiationBL.getAuditStatuses(firstJob.getUuid(), VID).stream().map(x -> x.getJobStatus()).collect(Collectors.toList());
691 assertThat(vidStatuses, is(expectedStatusesForVid));
694 private Job pushBulkPullPendingJobAndAssertJobStatus(JobStatus pulledJobStatus, JobStatus otherJobsStatus) {
696 return pullPendingJobAndAssertJobStatus(pulledJobStatus, otherJobsStatus);
699 private Job pullPendingJobAndAssertJobStatus(JobStatus pulledJobStatus, JobStatus otherJobsStatus) {
700 Job job = pullJobProcessAndPushBack(PENDING, pulledJobStatus, false);
701 listServicesAndAssertStatus(pulledJobStatus, otherJobsStatus, job);
706 public void test2BulksLifeCyclesAreIndependent() {
708 when(restMso.PostForObject(any(), any(), eq(RequestReferencesContainer.class))).thenReturn(createResponse(200));
709 //push 2nd job, then when pulling first job the job become in_progress, other jobs (from 2 bulks) remain pending
710 Job firstJob = pushBulkPullPendingJobAndAssertJobStatus(JobStatus.IN_PROGRESS, PENDING);
712 //assert we can pull another job from pending from other template id
713 Job secondJob = pullJobProcessAndPushBack(PENDING, JobStatus.IN_PROGRESS, false);
714 assertThat(firstJob.getTemplateId(), not(equalTo(secondJob.getTemplateId())));
716 //assert no more PENDING jobs to pull
717 assertFalse(jobsBrokerService.pull(PENDING, randomUuid()).isPresent());
719 //when get FAILED status from MSO statuses for failed bulk are: FAILED, STOPPED, for other bulk: IN_PROGRESS, 2 pending
720 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).
721 thenReturn(asyncRequestStatusResponseAsRestObject(FAILED_STR));
722 Job failedJob = pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, JobStatus.FAILED, false);
723 Map<UUID, List<ServiceInfo>> servicesByTemplateId =
724 asyncInstantiationBL.getAllServicesInfo()
725 .stream().collect(groupingBy(ServiceInfo::getTemplateId));
726 assertServicesStatus(servicesByTemplateId.get(failedJob.getTemplateId()), JobStatus.FAILED, JobStatus.STOPPED, failedJob);
727 Job successJob = failedJob.getUuid().equals(firstJob.getUuid()) ? secondJob : firstJob;
728 assertServicesStatus(servicesByTemplateId.get(successJob.getTemplateId()), JobStatus.IN_PROGRESS, PENDING, successJob);
730 //yet no more PENDING jobs to pull
731 assertFalse(jobsBrokerService.pull(PENDING, randomUuid()).isPresent());
733 //assert that job from non failed bulk can progress.
734 //When completed, failed bulk statuses: FAILED, STOPPED. Succeeded bulk statuses are : COMPLETED, 2 pending
735 when(restMso.GetForObject(any(), eq(AsyncRequestStatus.class))).
736 thenReturn(asyncRequestStatusResponseAsRestObject(COMPLETE_STR));
737 pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, COMPLETED, false);
738 servicesByTemplateId =
739 asyncInstantiationBL.getAllServicesInfo()
740 .stream().collect(groupingBy(ServiceInfo::getTemplateId));
741 assertServicesStatus(servicesByTemplateId.get(failedJob.getTemplateId()), JobStatus.FAILED, JobStatus.STOPPED, failedJob);
742 assertServicesStatus(servicesByTemplateId.get(successJob.getTemplateId()), COMPLETED, PENDING, successJob);
744 //advance other jobs of succeeded bulk till al of them reach to COMPLETED
745 pullJobProcessAndPushBack(PENDING, JobStatus.IN_PROGRESS, false);
746 pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, COMPLETED, false);
747 pullJobProcessAndPushBack(PENDING, JobStatus.IN_PROGRESS, false);
748 pullJobProcessAndPushBack(JobStatus.IN_PROGRESS, COMPLETED, false);
749 servicesByTemplateId =
750 asyncInstantiationBL.getAllServicesInfo()
751 .stream().collect(groupingBy(ServiceInfo::getTemplateId));
752 assertServicesStatus(servicesByTemplateId.get(failedJob.getTemplateId()), JobStatus.FAILED, JobStatus.STOPPED, failedJob);
753 assertServicesStatus(servicesByTemplateId.get(successJob.getTemplateId()), COMPLETED, COMPLETED, successJob);
755 //assert no more PENDING jobs nor IN_PROGRESS jobs to pull
756 assertFalse(jobsBrokerService.pull(PENDING, randomUuid()).isPresent());
757 assertFalse(jobsBrokerService.pull(JobStatus.IN_PROGRESS, randomUuid()).isPresent());
760 public void deploy2NewGroupsToServiceWith1ExistingGroup() {
763 new feature: skip service (existing impl) and skip group (new impl)
764 service+group aren't touched, 2 groups ARE created
766 [v] success if all GROUPs success
768 Next test variation should:
769 [ ] error if all GROUPs error
770 [ ] completed with error if 1 GROUP error
773 [v] + service with 3 groups - 1 action=none, 2 action=create; service's action=none
774 [v] verify STATUS pending
775 [v] + pull+execute (should NOT post to MSO)
776 [v] verify STATUS in progress; TYPE watching
778 [v] verify job#2 *new* GROUP job STATUS completed with no action TYPE group INTERNAL STATE terminal PHASE delete
779 [v] verify job#3 *new* GROUP job STATUS completed with no action TYPE group INTERNAL STATE terminal PHASE delete
780 [v] verify job#4 *new* GROUP job STATUS completed with no action TYPE group INTERNAL STATE terminal PHASE delete
782 [v] + pull+execute job#1 (should NOT post to MSO)
783 [v] verify STATUS in progress; TYPE watching
784 [v] verify job#5 *new* GROUP job STATUS creating TYPE group INTERNAL STATE initial PHASE create
785 [v] verify job#6 *new* GROUP job STATUS creating TYPE group INTERNAL STATE initial PHASE create
786 [v] verify job#7 *new* GROUP job STATUS creating TYPE group INTERNAL STATE initial PHASE create
788 [v] + pull+execute job#5 (should NOT post to MSO)
789 [v] verify job#5 STATUS completed with no action TYPE group INTERNAL STATE terminal PHASE create
790 [v] + pull+execute job#1
791 [v] verify job#1 STATUS in progress; TYPE watching
793 [v] + pull+execute job#6 (should post to MSO)
794 [v] verify job#6 STATUS resource in progress
795 [v] + pull+execute job#1
796 [v] verify job#1 STATUS in progress; TYPE watching
797 [v] + pull+execute job#6 (should get from MSO)
798 [v] verify job#6 STATUS completed
799 [v] + pull+execute job#1
800 [v] verify job#1 STATUS in progress; TYPE watching
802 [v] + pull+execute job#7 (should post to MSO)
803 [v] verify job#7 STATUS resource in progress
804 [v] + pull+execute job#1
805 [v] verify job#1 STATUS in progress; TYPE watching
806 [v] + pull+execute job#7 (should get from MSO)
807 [v] verify job#7 STATUS completed
808 [v] + pull+execute job#1
809 [v] verify job#1 STATUS completed
813 final String GROUP1_REQUEST_ID = UUID.randomUUID().toString();
814 final String GROUP1_INSTANCE_ID = UUID.randomUUID().toString();
815 final String GROUP2_REQUEST_ID = UUID.randomUUID().toString();
816 final String GROUP2_INSTANCE_ID = UUID.randomUUID().toString();
819 final BiConsumer<Action, JobStatus> verify_Job1InProgress = (phase, nextJobStatus) -> {
820 pullJobProcessAndPushBackWithTypeAssertion(IN_PROGRESS, nextJobStatus, JobType.ALaCarteService, phase, InternalState.WATCHING, 2);
823 //service with 3 groups - 1 action=none, 2 action=create; service's action=none
824 UUID uuid = pushALaCarteUpdateWithGroups();
825 singleServicesAndAssertStatus(PENDING, uuid);
827 // take from pending, put in-progress -> 3 delete-child were born
828 pullJobProcessAndPushBackWithTypeAssertion(PENDING, IN_PROGRESS, JobType.ALaCarteService, Action.Delete, InternalState.WATCHING, 1);
829 verifyQueueSizes(ImmutableMap.of(
830 IN_PROGRESS, 1, CREATING, 3
833 Stream.of(1, 2, 3).forEach(i -> {
834 // take each child creating, put in-progress
835 verify_Job1InProgress.accept(Action.Delete, IN_PROGRESS);
836 pullJobProcessAndPushBackWithTypeAssertion(CREATING, RESOURCE_IN_PROGRESS, JobType.InstanceGroup, Action.Delete, null, 1);
838 // execute each in-progress -> job is completed
839 verify_Job1InProgress.accept(Action.Delete, IN_PROGRESS);
840 pullJobProcessAndPushBackWithTypeAssertion(RESOURCE_IN_PROGRESS, COMPLETED/*_WITH_NO_ACTION*/, JobType.InstanceGroup,1);
842 verifyQueueSizes(ImmutableMap.of(
843 IN_PROGRESS, 1, COMPLETED, 3
846 // take job #1 from phase delete to phase create -> 3 create-child were born
847 verify_Job1InProgress.accept(Action.Create, IN_PROGRESS);
848 verifyQueueSizes(ImmutableMap.of(
849 IN_PROGRESS, 1, CREATING, 3, COMPLETED, 3
853 when(restMso.PostForObject(any(), endsWith("instanceGroups"), eq(RequestReferencesContainer.class)))
854 .thenReturn(createResponse(200, GROUP1_INSTANCE_ID, GROUP1_REQUEST_ID))
855 .thenReturn(createResponse(200, GROUP2_INSTANCE_ID, GROUP2_REQUEST_ID))
857 when(restMso.GetForObject(argThat(uri -> StringUtils.endsWithAny(uri, GROUP1_REQUEST_ID, GROUP2_REQUEST_ID)), eq(AsyncRequestStatus.class))).
858 thenReturn(asyncRequestStatusResponseAsRestObject(COMPLETE_STR));
860 // take first "none" child from creating to completed
861 // note there's no concrete mechanism that makes the first child be
862 // the "action=None" case, but that's what happens, and following line
863 // relies on that fact.
864 pullJobProcessAndPushBackWithTypeAssertion(CREATING, COMPLETED_WITH_NO_ACTION, JobType.InstanceGroupInstantiation, 1);
866 // take each of next two children from creating to in-progress, then to completed
867 // verify job #1 is watching, and MSO is getting requests
868 Stream.of(1, 2).forEach(i -> {
869 verify_Job1InProgress.accept(Action.Create, IN_PROGRESS);
870 pullJobProcessAndPushBackWithTypeAssertion(CREATING, RESOURCE_IN_PROGRESS, JobType.ResourceInProgressStatus);
871 verify(restMso, times(i)).PostForObject(any(), any(), any());
873 verify_Job1InProgress.accept(Action.Create, IN_PROGRESS);
874 pullJobProcessAndPushBackWithTypeAssertion(RESOURCE_IN_PROGRESS, COMPLETED, JobType.ResourceInProgressStatus);
875 verify(restMso, times(i)).GetForObject(any(), any());
878 // job #1 is done as all children are done
879 verify_Job1InProgress.accept(Action.Create, COMPLETED);
880 verifyQueueSizes(ImmutableMap.of(COMPLETED, 7));
883 private void verifyQueueSizes(ImmutableMap<JobStatus, Integer> expected) {
884 final Collection<Job> peek = jobsBrokerService.peek();
885 final Map<JobStatus, Long> jobTypes = peek.stream().collect(groupingBy(Job::getStatus, counting()));
886 assertThat(jobTypes, is(expected));
889 private List<ServiceInfo> listServicesAndAssertStatus(JobStatus pulledJobStatus, JobStatus otherJobsStatus, Job job) {
890 List<ServiceInfo> serviceInfoList = asyncInstantiationBL.getAllServicesInfo();
891 assertServicesStatus(serviceInfoList, pulledJobStatus, otherJobsStatus, job);
893 return serviceInfoList;
896 private ServiceInfo singleServicesAndAssertStatus(JobStatus expectedStatus, UUID jobUUID) {
897 List<ServiceInfo> serviceInfoList = asyncInstantiationBL.getAllServicesInfo();
898 assertEquals(1, serviceInfoList.size());
899 ServiceInfo serviceInfo = serviceInfoList.get(0);
900 assertThat(serviceInfo.getJobStatus(), is(expectedStatus));
901 assertThat(serviceInfo.getJobId(), is(jobUUID));
905 private void assertServicesStatus(List<ServiceInfo> serviceInfoList, JobStatus pulledJobStatus, JobStatus otherJobsStatus, Job job) {
906 serviceInfoList.forEach(si->{
907 if (si.getJobId().equals(job.getUuid())) {
908 assertThat(si.getJobStatus(), is(pulledJobStatus));
911 assertThat(si.getJobStatus(), is(otherJobsStatus));
916 private void listServicesAndAssertStatus(Map<UUID, JobStatus> expectedJobStatusMap) {
917 Map<UUID, JobStatus> actualStatuses = asyncInstantiationBL.getAllServicesInfo()
918 .stream().collect(Collectors.toMap(ServiceInfo::getJobId, ServiceInfo::getJobStatus));
919 assertThat(actualStatuses.entrySet(), equalTo(expectedJobStatusMap.entrySet()));
922 private String randomUuid() {
923 return UUID.randomUUID().toString();