2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.pipelines;
 
  23 import static org.junit.jupiter.api.Assertions.assertEquals;
 
  24 import static org.mockito.ArgumentMatchers.any;
 
  25 import static org.mockito.ArgumentMatchers.anyString;
 
  26 import static org.mockito.Mockito.times;
 
  27 import static org.mockito.Mockito.verify;
 
  28 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
  29 import static org.mockito.Mockito.verifyZeroInteractions;
 
  30 import static org.mockito.Mockito.when;
 
  31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_REREGISTRATION_TASK_NAME;
 
  32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
 
  33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
 
  35 import java.util.Arrays;
 
  36 import java.util.Collections;
 
  37 import java.util.HashMap;
 
  38 import java.util.UUID;
 
  40 import javax.net.ssl.SSLException;
 
  42 import org.junit.jupiter.api.BeforeEach;
 
  43 import org.junit.jupiter.api.DisplayName;
 
  44 import org.junit.jupiter.api.Test;
 
  45 import org.mockito.Mockito;
 
  46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
 
  47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
 
  48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
 
  49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
 
  50 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
 
  51 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
 
  52 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
 
  53 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
 
  54 import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
 
  55 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
 
  56 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
 
  57 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
 
  58 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
 
  59 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
 
  60 import org.onap.bbs.event.processor.model.PnfAaiObject;
 
  61 import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
 
  62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
 
  63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
 
  64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
 
  65 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
 
  66 import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
 
  67 import org.springframework.http.HttpStatus;
 
  68 import org.springframework.http.ResponseEntity;
 
  70 import reactor.core.publisher.Flux;
 
  71 import reactor.core.publisher.Mono;
 
  72 import reactor.test.StepVerifier;
 
  74 // We can safely suppress unchecked assignment warnings for the ResponseEntity mock
 
  75 @SuppressWarnings("unchecked")
 
  76 @DisplayName("PNF Re-registration Pipeline Unit-Tests")
 
  77 class ReRegistrationPipelineTest {
 
  79     private ReRegistrationPipeline pipeline;
 
  80     private ApplicationConfiguration configuration;
 
  81     private DmaapReRegistrationConsumerTask consumerTask;
 
  82     private DmaapPublisherTask publisherTask;
 
  83     private AaiClientTask aaiClientTask;
 
  85     private ResponseEntity<String> responseEntity;
 
  90         responseEntity = Mockito.mock(ResponseEntity.class);
 
  92         configuration = Mockito.mock(ApplicationConfiguration.class);
 
  93         consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class);
 
  94         publisherTask = Mockito.mock(DmaapPublisherTask.class);
 
  95         aaiClientTask = Mockito.mock(AaiClientTask.class);
 
  97         when(configuration.getReRegistrationCloseLoopControlName())
 
  98                 .thenReturn("controlName");
 
  99         when(configuration.getReRegistrationCloseLoopPolicyScope())
 
 100                 .thenReturn("policyScope");
 
 101         when(configuration.getPolicyVersion())
 
 102                 .thenReturn("1.0.0");
 
 103         when(configuration.getCloseLoopTargetType())
 
 105         when(configuration.getCloseLoopEventStatus())
 
 106                 .thenReturn("ONSET");
 
 107         when(configuration.getCloseLoopVersion())
 
 108                 .thenReturn("1.0.2");
 
 109         when(configuration.getCloseLoopTarget())
 
 110                 .thenReturn("CL-Target");
 
 111         when(configuration.getCloseLoopOriginator())
 
 112                 .thenReturn("DCAE-BBS-ep");
 
 114         pipeline = new ReRegistrationPipeline(configuration, consumerTask,
 
 115                 publisherTask, aaiClientTask, new HashMap<>());
 
 119     void handleEmptyResponseFromDmaap() throws SSLException {
 
 121         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 122         when(consumerTask.execute(anyString()))
 
 123                 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
 
 125         StepVerifier.create(pipeline.executePipeline())
 
 126                 .expectSubscription()
 
 129         verifyZeroInteractions(aaiClientTask);
 
 130         verifyZeroInteractions(publisherTask);
 
 134     void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
 
 137         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 138         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 139                 .thenReturn(Flux.never());
 
 142         StepVerifier.create(pipeline.executePipeline())
 
 143                 .expectSubscription()
 
 146         verifyZeroInteractions(aaiClientTask);
 
 147         verifyZeroInteractions(publisherTask);
 
 151     void noResponseFromAai_PipelineTimesOut() throws SSLException {
 
 153         String pnfName = "olt1";
 
 154         String attachmentPoint = "olt2-2-2";
 
 155         String remoteId = "newRemoteId";
 
 156         String cvlan = "1005";
 
 157         String svlan = "100";
 
 159         // Prepare stubbed replies
 
 160         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 161                 .correlationId(pnfName)
 
 162                 .attachmentPoint(attachmentPoint)
 
 169         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 170         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 171         when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
 
 174         StepVerifier.create(pipeline.executePipeline())
 
 175                 .expectSubscription()
 
 178         verifyZeroInteractions(publisherTask);
 
 182     void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
 
 184         String pnfName = "olt1";
 
 185         String attachmentPoint = "olt2-2-2";
 
 186         String remoteId = "newRemoteId";
 
 187         String cvlan = "1005";
 
 188         String svlan = "100";
 
 189         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 191         // Prepare stubbed replies
 
 192         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 193                 .correlationId(pnfName)
 
 194                 .attachmentPoint(attachmentPoint)
 
 200         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "olt1-1-1", hsiCfsServiceInstanceId);
 
 201         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 202                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
 
 205         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 206                 hsiCfsServiceInstance.getServiceInstanceId());
 
 208         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 209         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 211         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 212                 .thenReturn(Mono.just(pnfAaiObject));
 
 215                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 216                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 218         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
 
 220         // Execute the pipeline
 
 221         StepVerifier.create(pipeline.executePipeline())
 
 222                 .expectSubscription()
 
 225         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 229     void singleCorrectEvent_PnfHavingNoLogicalLink_handleGracefully() throws SSLException {
 
 231         String pnfName = "olt1";
 
 232         String attachmentPoint = "olt2-2-2";
 
 233         String remoteId = "newRemoteId";
 
 234         String cvlan = "1005";
 
 235         String svlan = "100";
 
 236         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 238         // Prepare stubbed replies
 
 239         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 240                 .correlationId(pnfName)
 
 241                 .attachmentPoint(attachmentPoint)
 
 247         PnfAaiObject pnfAaiObject = constructPnfObjectWithoutLogicalLink(pnfName, hsiCfsServiceInstanceId);
 
 248         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 249                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
 
 252         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 253                 hsiCfsServiceInstance.getServiceInstanceId());
 
 255         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 256         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 258         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 259                 .thenReturn(Mono.just(pnfAaiObject));
 
 262                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 263                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 265         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 266         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 268         // Execute the pipeline
 
 269         StepVerifier.create(pipeline.executePipeline())
 
 270                 .expectSubscription()
 
 273         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
 
 274         verifyNoMoreInteractions(aaiClientTask);
 
 275         verifyZeroInteractions(publisherTask);
 
 279     void singleCorrectEvent_handleSuccessfully() throws SSLException {
 
 281         String pnfName = "olt1";
 
 282         String attachmentPoint = "olt2-2-2";
 
 283         String remoteId = "newRemoteId";
 
 284         String cvlan = "1005";
 
 285         String svlan = "100";
 
 286         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 288         // Prepare stubbed replies
 
 289         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 290                 .correlationId(pnfName)
 
 291                 .attachmentPoint(attachmentPoint)
 
 297         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "old-attachment-point", hsiCfsServiceInstanceId);
 
 298         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 299                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
 
 302         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 303                 hsiCfsServiceInstance.getServiceInstanceId());
 
 305         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 306         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 308         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 309                 .thenReturn(Mono.just(pnfAaiObject));
 
 312                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 313                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 315         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 316         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 318         // Execute the pipeline
 
 319         StepVerifier.create(pipeline.executePipeline())
 
 320                 .expectSubscription()
 
 321                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 324         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 328     void twoCorrectEvents_handleSuccessfully() throws SSLException {
 
 330         String pnfName1 = "olt1";
 
 331         String pnfName2 = "olt2";
 
 332         String attachmentPoint1 = "olt1-1-1";
 
 333         String attachmentPoint2 = "olt2-2-2";
 
 334         String remoteId1 = "newRemoteId1";
 
 335         String remoteId2 = "newRemoteId2";
 
 336         String cvlan1 = "1005";
 
 337         String cvlan2 = "1006";
 
 338         String svlan = "100";
 
 339         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 340         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 342         // Prepare stubbed replies
 
 343         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 344                 .correlationId(pnfName1)
 
 345                 .attachmentPoint(attachmentPoint1)
 
 350         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 351                 .correlationId(pnfName2)
 
 352                 .attachmentPoint(attachmentPoint2)
 
 358         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
 
 359         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, "olt2-2-0", hsiCfsServiceInstanceId2);
 
 360         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 361                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
 
 362         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 363                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
 
 366         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 367         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 368         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 369                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 370         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 371                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 373         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 374         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 375                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 377         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 378         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 381                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 382                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 384                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 385                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 387         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 388         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 390         // Execute the pipeline
 
 391         StepVerifier.create(pipeline.executePipeline())
 
 392                 .expectSubscription()
 
 393                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 394                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 397         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
 
 401     void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
 
 403         String pnfName = "olt1";
 
 404         String attachmentPoint = "olt2-2-2";
 
 405         String remoteId = "newRemoteId";
 
 406         String cvlan = "1005";
 
 407         String svlan = "100";
 
 409         // Prepare stubbed replies
 
 410         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 411                 .correlationId(pnfName)
 
 412                 .attachmentPoint(attachmentPoint)
 
 419         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 420         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 421         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 422                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 424         // Execute the pipeline
 
 425         StepVerifier.create(pipeline.executePipeline())
 
 426                 .expectSubscription()
 
 429         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
 
 430         verifyNoMoreInteractions(aaiClientTask);
 
 431         verifyZeroInteractions(publisherTask);
 
 435     void twoEvents_FirstOk_SecondNotRelocation_handleCorrectOnly() throws SSLException {
 
 437         String pnfName1 = "olt1";
 
 438         String pnfName2 = "olt2";
 
 439         String attachmentPoint1 = "olt1-1-1";
 
 440         String attachmentPoint2 = "olt2-2-2";
 
 441         String remoteId1 = "newRemoteId1";
 
 442         String remoteId2 = "newRemoteId2";
 
 443         String cvlan1 = "1005";
 
 444         String cvlan2 = "1006";
 
 445         String svlan = "100";
 
 446         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 447         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 449         // Prepare stubbed replies
 
 450         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 451                 .correlationId(pnfName1)
 
 452                 .attachmentPoint(attachmentPoint1)
 
 457         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 458                 .correlationId(pnfName2)
 
 459                 .attachmentPoint(attachmentPoint2)
 
 465         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
 
 466         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, attachmentPoint2, hsiCfsServiceInstanceId2);
 
 467         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 468                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
 
 469         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 470                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
 
 473         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 474         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 475         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 476                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 477         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 478                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 480         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 481         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 482                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 484         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 485         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 488                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 489                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 491                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 492                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 494         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 495         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 497         // Execute the pipeline
 
 498         StepVerifier.create(pipeline.executePipeline())
 
 499                 .expectSubscription()
 
 500                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 503         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 507     void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
 
 509         String pnfName1 = "olt1";
 
 510         String pnfName2 = "olt2";
 
 511         String attachmentPoint1 = "olt1-1-1";
 
 512         String attachmentPoint2 = "olt2-2-2";
 
 513         String remoteId1 = "newRemoteId1";
 
 514         String remoteId2 = "newRemoteId2";
 
 515         String cvlan1 = "1005";
 
 516         String cvlan2 = "1006";
 
 517         String svlan = "100";
 
 518         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 520         // Prepare stubbed replies
 
 521         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 522                 .correlationId(pnfName1)
 
 523                 .attachmentPoint(attachmentPoint1)
 
 528         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 529                 .correlationId(pnfName2)
 
 530                 .attachmentPoint(attachmentPoint2)
 
 536         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, "old-attachment-point", hsiCfsServiceInstanceId);
 
 537         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 538                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, cvlan1);
 
 541         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 542                 hsiCfsServiceInstance.getServiceInstanceId());
 
 544         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 545         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 546                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 547         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 548                 .thenReturn(Mono.just(pnfAaiObject))
 
 549                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 551                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 552                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 554         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 555         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 557         // Execute the pipeline
 
 558         StepVerifier.create(pipeline.executePipeline())
 
 559                 .expectSubscription()
 
 560                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 563         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
 
 564         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 565         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 569     void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
 
 571         String pnfName1 = "olt1";
 
 572         String pnfName2 = "olt2";
 
 573         String attachmentPoint1 = "olt1-1-1";
 
 574         String attachmentPoint2 = "olt2-2-2";
 
 575         String remoteId1 = "newRemoteId1";
 
 576         String remoteId2 = "newRemoteId2";
 
 577         String cvlan1 = "1005";
 
 578         String cvlan2 = "1006";
 
 579         String svlan = "100";
 
 580         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 582         // Prepare stubbed replies
 
 583         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 584                 .correlationId(pnfName1)
 
 585                 .attachmentPoint(attachmentPoint1)
 
 590         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 591                 .correlationId(pnfName2)
 
 592                 .attachmentPoint(attachmentPoint2)
 
 598         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, "old-attachment-point", hsiCfsServiceInstanceId);
 
 599         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 600                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, cvlan2);
 
 603         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 604                 hsiCfsServiceInstance.getServiceInstanceId());
 
 606         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 607         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 608                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 609         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 610                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
 
 611                 .thenReturn(Mono.just(pnfAaiObject));
 
 613                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 614                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 616         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 617         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 619         // Execute the pipeline
 
 620         StepVerifier.create(pipeline.executePipeline())
 
 621                 .expectSubscription()
 
 622                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 625         verify(aaiClientTask, times(2))
 
 626                 .executePnfRetrieval(anyString(), anyString());
 
 627         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 628         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 631     private PnfAaiObject constructPnfObject(String pnfName, String attachmentPoint,
 
 632                                             String hsiCfsServiceInstanceId) {
 
 634         // Build Relationship Data
 
 635         RelationshipListAaiObject.RelationshipEntryAaiObject firstRelationshipEntry =
 
 636                 ImmutableRelationshipEntryAaiObject.builder()
 
 637                         .relatedTo("service-instance")
 
 638                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
 
 639                                 + "/service-subscription/BBS-CFS/service-instances"
 
 640                                 + "/service-instance/" + hsiCfsServiceInstanceId)
 
 641                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
 
 642                         .relationshipData(Arrays.asList(
 
 643                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 644                                         .relationshipKey("customer.global-customer-id")
 
 645                                         .relationshipValue("Demonstration").build(),
 
 646                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 647                                         .relationshipKey("service-subscription.service-type")
 
 648                                         .relationshipValue("BBS-CFS").build(),
 
 649                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 650                                         .relationshipKey("service-instance.service-instance-id")
 
 651                                         .relationshipValue(hsiCfsServiceInstanceId).build())
 
 653                         .relatedToProperties(Collections.singletonList(
 
 654                                 ImmutablePropertyAaiObject.builder()
 
 655                                         .propertyKey("service-instance.service-instance-name")
 
 656                                         .propertyValue("bbs-instance").build())
 
 660         RelationshipListAaiObject.RelationshipEntryAaiObject secondRelationshipEntry =
 
 661                 ImmutableRelationshipEntryAaiObject.builder()
 
 662                         .relatedTo("logical-link")
 
 663                         .relatedLink("/network/logical-links/logical-link/" + attachmentPoint)
 
 664                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
 
 665                                 .relationshipKey("logical-link.link-name")
 
 666                                 .relationshipValue(attachmentPoint).build()))
 
 669         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 670                 .relationshipEntries(Arrays.asList(firstRelationshipEntry, secondRelationshipEntry))
 
 673         // Finally construct PNF object data
 
 674         return ImmutablePnfAaiObject.builder()
 
 676                 .isInMaintenance(true)
 
 677                 .relationshipListAaiObject(relationshipListAaiObject)
 
 681     private PnfAaiObject constructPnfObjectWithoutLogicalLink(String pnfName, String hsiCfsServiceInstanceId) {
 
 683         // Build Relationship Data
 
 684         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 685                 ImmutableRelationshipEntryAaiObject.builder()
 
 686                         .relatedTo("service-instance")
 
 687                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
 
 688                                 + "/service-subscription/BBS-CFS/service-instances"
 
 689                                 + "/service-instance/" + hsiCfsServiceInstanceId)
 
 690                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
 
 691                         .relationshipData(Arrays.asList(
 
 692                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 693                                         .relationshipKey("customer.global-customer-id")
 
 694                                         .relationshipValue("Demonstration").build(),
 
 695                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 696                                         .relationshipKey("service-subscription.service-type")
 
 697                                         .relationshipValue("BBS-CFS").build(),
 
 698                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 699                                         .relationshipKey("service-instance.service-instance-id")
 
 700                                         .relationshipValue(hsiCfsServiceInstanceId).build())
 
 702                         .relatedToProperties(Collections.singletonList(
 
 703                                 ImmutablePropertyAaiObject.builder()
 
 704                                         .propertyKey("service-instance.service-instance-name")
 
 705                                         .propertyValue("bbs-instance").build())
 
 709         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 710                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 713         // Finally construct PNF object data
 
 714         return ImmutablePnfAaiObject.builder()
 
 716                 .isInMaintenance(true)
 
 717                 .relationshipListAaiObject(relationshipListAaiObject)
 
 721     private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
 
 724         String orchestrationStatus = "active";
 
 726         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 727                 ImmutableRelationshipEntryAaiObject.builder()
 
 729                         .relatedLink("/pnfs/pnf/" + pnfName)
 
 730                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
 
 731                                 .relationshipKey("pnf.pnf-name")
 
 732                                 .relationshipValue(pnfName).build()))
 
 735         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 736                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 739         MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
 
 740                 ImmutableMetadataEntryAaiObject.builder()
 
 745         MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
 
 746                 .metadataEntries(Collections.singletonList(metadataEntry))
 
 749         // Finally construct Service Instance object data
 
 750         return ImmutableServiceInstanceAaiObject.builder()
 
 751                 .serviceInstanceId(hsiCfsServiceInstanceId)
 
 752                 .orchestrationStatus(orchestrationStatus)
 
 753                 .relationshipListAaiObject(relationshipListAaiObject)
 
 754                 .metadataListAaiObject(metadataListAaiObject)