2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.pipelines;
 
  23 import static org.junit.jupiter.api.Assertions.assertEquals;
 
  24 import static org.mockito.ArgumentMatchers.any;
 
  25 import static org.mockito.ArgumentMatchers.anyString;
 
  26 import static org.mockito.Mockito.times;
 
  27 import static org.mockito.Mockito.verify;
 
  28 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
  29 import static org.mockito.Mockito.verifyZeroInteractions;
 
  30 import static org.mockito.Mockito.when;
 
  31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_REREGISTRATION_TASK_NAME;
 
  32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
 
  33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
 
  35 import java.util.Arrays;
 
  36 import java.util.Collections;
 
  37 import java.util.HashMap;
 
  38 import java.util.UUID;
 
  40 import javax.net.ssl.SSLException;
 
  42 import org.junit.jupiter.api.BeforeEach;
 
  43 import org.junit.jupiter.api.DisplayName;
 
  44 import org.junit.jupiter.api.Test;
 
  45 import org.mockito.Mockito;
 
  46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
 
  47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
 
  48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
 
  49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
 
  50 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
 
  51 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
 
  52 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
 
  53 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
 
  54 import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
 
  55 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
 
  56 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
 
  57 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
 
  58 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
 
  59 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
 
  60 import org.onap.bbs.event.processor.model.PnfAaiObject;
 
  61 import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
 
  62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
 
  63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
 
  64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
 
  65 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
 
  66 import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
 
  67 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 
  68 import org.springframework.http.HttpStatus;
 
  70 import reactor.core.publisher.Flux;
 
  71 import reactor.core.publisher.Mono;
 
  72 import reactor.test.StepVerifier;
 
  74 @DisplayName("PNF Re-registration Pipeline Unit-Tests")
 
  75 class ReRegistrationPipelineTest {
 
  77     private ReRegistrationPipeline pipeline;
 
  78     private ApplicationConfiguration configuration;
 
  79     private DmaapReRegistrationConsumerTask consumerTask;
 
  80     private DmaapPublisherTask publisherTask;
 
  81     private AaiClientTask aaiClientTask;
 
  83     private HttpResponse httpResponse;
 
  88         httpResponse = Mockito.mock(HttpResponse.class);
 
  90         configuration = Mockito.mock(ApplicationConfiguration.class);
 
  91         consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class);
 
  92         publisherTask = Mockito.mock(DmaapPublisherTask.class);
 
  93         aaiClientTask = Mockito.mock(AaiClientTask.class);
 
  95         when(configuration.getReRegistrationCloseLoopControlName())
 
  96                 .thenReturn("controlName");
 
  97         when(configuration.getReRegistrationCloseLoopPolicyScope())
 
  98                 .thenReturn("policyScope");
 
  99         when(configuration.getPolicyVersion())
 
 100                 .thenReturn("1.0.0");
 
 101         when(configuration.getCloseLoopTargetType())
 
 103         when(configuration.getCloseLoopEventStatus())
 
 104                 .thenReturn("ONSET");
 
 105         when(configuration.getCloseLoopVersion())
 
 106                 .thenReturn("1.0.2");
 
 107         when(configuration.getCloseLoopTarget())
 
 108                 .thenReturn("CL-Target");
 
 109         when(configuration.getCloseLoopOriginator())
 
 110                 .thenReturn("DCAE-BBS-ep");
 
 112         pipeline = new ReRegistrationPipeline(configuration, consumerTask,
 
 113                 publisherTask, aaiClientTask, new HashMap<>());
 
 117     void handleEmptyResponseFromDmaap() throws SSLException {
 
 119         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 120         when(consumerTask.execute(anyString()))
 
 121                 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
 
 123         StepVerifier.create(pipeline.executePipeline())
 
 124                 .expectSubscription()
 
 127         verifyZeroInteractions(aaiClientTask);
 
 128         verifyZeroInteractions(publisherTask);
 
 132     void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
 
 135         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 136         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 137                 .thenReturn(Flux.never());
 
 140         StepVerifier.create(pipeline.executePipeline())
 
 141                 .expectSubscription()
 
 144         verifyZeroInteractions(aaiClientTask);
 
 145         verifyZeroInteractions(publisherTask);
 
 149     void noResponseFromAai_PipelineTimesOut() throws SSLException {
 
 151         String pnfName = "olt1";
 
 152         String attachmentPoint = "olt2-2-2";
 
 153         String remoteId = "newRemoteId";
 
 154         String cvlan = "1005";
 
 155         String svlan = "100";
 
 157         // Prepare stubbed replies
 
 158         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 159                 .correlationId(pnfName)
 
 160                 .attachmentPoint(attachmentPoint)
 
 167         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 168         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 169         when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
 
 172         StepVerifier.create(pipeline.executePipeline())
 
 173                 .expectSubscription()
 
 176         verifyZeroInteractions(publisherTask);
 
 180     void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
 
 182         String pnfName = "olt1";
 
 183         String attachmentPoint = "olt2-2-2";
 
 184         String remoteId = "newRemoteId";
 
 185         String cvlan = "1005";
 
 186         String svlan = "100";
 
 187         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 189         // Prepare stubbed replies
 
 190         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 191                 .correlationId(pnfName)
 
 192                 .attachmentPoint(attachmentPoint)
 
 198         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "olt1-1-1", hsiCfsServiceInstanceId);
 
 199         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 200                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
 
 203         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 204                 hsiCfsServiceInstance.getServiceInstanceId());
 
 206         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 207         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 209         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 210                 .thenReturn(Mono.just(pnfAaiObject));
 
 213                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 214                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 216         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
 
 218         // Execute the pipeline
 
 219         StepVerifier.create(pipeline.executePipeline())
 
 220                 .expectSubscription()
 
 223         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 227     void singleCorrectEvent_PnfHavingNoLogicalLink_handleGracefully() throws SSLException {
 
 229         String pnfName = "olt1";
 
 230         String attachmentPoint = "olt2-2-2";
 
 231         String remoteId = "newRemoteId";
 
 232         String cvlan = "1005";
 
 233         String svlan = "100";
 
 234         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 236         // Prepare stubbed replies
 
 237         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 238                 .correlationId(pnfName)
 
 239                 .attachmentPoint(attachmentPoint)
 
 245         PnfAaiObject pnfAaiObject = constructPnfObjectWithoutLogicalLink(pnfName, hsiCfsServiceInstanceId);
 
 246         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 247                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
 
 250         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 251                 hsiCfsServiceInstance.getServiceInstanceId());
 
 253         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 254         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 256         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 257                 .thenReturn(Mono.just(pnfAaiObject));
 
 260                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 261                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 263         when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
 
 264         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
 266         // Execute the pipeline
 
 267         StepVerifier.create(pipeline.executePipeline())
 
 268                 .expectSubscription()
 
 271         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
 
 272         verifyNoMoreInteractions(aaiClientTask);
 
 273         verifyZeroInteractions(publisherTask);
 
 277     void singleCorrectEvent_handleSuccessfully() throws SSLException {
 
 279         String pnfName = "olt1";
 
 280         String attachmentPoint = "olt2-2-2";
 
 281         String remoteId = "newRemoteId";
 
 282         String cvlan = "1005";
 
 283         String svlan = "100";
 
 284         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 286         // Prepare stubbed replies
 
 287         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 288                 .correlationId(pnfName)
 
 289                 .attachmentPoint(attachmentPoint)
 
 295         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "old-attachment-point", hsiCfsServiceInstanceId);
 
 296         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 297                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
 
 300         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 301                 hsiCfsServiceInstance.getServiceInstanceId());
 
 303         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 304         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 306         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 307                 .thenReturn(Mono.just(pnfAaiObject));
 
 310                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 311                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 313         when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
 
 314         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
 316         // Execute the pipeline
 
 317         StepVerifier.create(pipeline.executePipeline())
 
 318                 .expectSubscription()
 
 319                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 322         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 326     void twoCorrectEvents_handleSuccessfully() throws SSLException {
 
 328         String pnfName1 = "olt1";
 
 329         String pnfName2 = "olt2";
 
 330         String attachmentPoint1 = "olt1-1-1";
 
 331         String attachmentPoint2 = "olt2-2-2";
 
 332         String remoteId1 = "newRemoteId1";
 
 333         String remoteId2 = "newRemoteId2";
 
 334         String cvlan1 = "1005";
 
 335         String cvlan2 = "1006";
 
 336         String svlan = "100";
 
 337         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 338         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 340         // Prepare stubbed replies
 
 341         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 342                 .correlationId(pnfName1)
 
 343                 .attachmentPoint(attachmentPoint1)
 
 348         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 349                 .correlationId(pnfName2)
 
 350                 .attachmentPoint(attachmentPoint2)
 
 356         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
 
 357         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, "olt2-2-0", hsiCfsServiceInstanceId2);
 
 358         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 359                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
 
 360         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 361                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
 
 364         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 365         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 366         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 367                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 368         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 369                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 371         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 372         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 373                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 375         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 376         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 379                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 380                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 382                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 383                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 385         when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
 
 386         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
 388         // Execute the pipeline
 
 389         StepVerifier.create(pipeline.executePipeline())
 
 390                 .expectSubscription()
 
 391                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 392                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 395         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
 
 399     void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
 
 401         String pnfName = "olt1";
 
 402         String attachmentPoint = "olt2-2-2";
 
 403         String remoteId = "newRemoteId";
 
 404         String cvlan = "1005";
 
 405         String svlan = "100";
 
 407         // Prepare stubbed replies
 
 408         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 409                 .correlationId(pnfName)
 
 410                 .attachmentPoint(attachmentPoint)
 
 417         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 418         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 419         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 420                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 422         // Execute the pipeline
 
 423         StepVerifier.create(pipeline.executePipeline())
 
 424                 .expectSubscription()
 
 427         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
 
 428         verifyNoMoreInteractions(aaiClientTask);
 
 429         verifyZeroInteractions(publisherTask);
 
 433     void twoEvents_FirstOk_SecondNotRelocation_handleCorrectOnly() throws SSLException {
 
 435         String pnfName1 = "olt1";
 
 436         String pnfName2 = "olt2";
 
 437         String attachmentPoint1 = "olt1-1-1";
 
 438         String attachmentPoint2 = "olt2-2-2";
 
 439         String remoteId1 = "newRemoteId1";
 
 440         String remoteId2 = "newRemoteId2";
 
 441         String cvlan1 = "1005";
 
 442         String cvlan2 = "1006";
 
 443         String svlan = "100";
 
 444         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 445         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 447         // Prepare stubbed replies
 
 448         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 449                 .correlationId(pnfName1)
 
 450                 .attachmentPoint(attachmentPoint1)
 
 455         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 456                 .correlationId(pnfName2)
 
 457                 .attachmentPoint(attachmentPoint2)
 
 463         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
 
 464         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, attachmentPoint2, hsiCfsServiceInstanceId2);
 
 465         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 466                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
 
 467         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 468                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
 
 471         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 472         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 473         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 474                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 475         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 476                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 478         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 479         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 480                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 482         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 483         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 486                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 487                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 489                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 490                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 492         when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
 
 493         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
 495         // Execute the pipeline
 
 496         StepVerifier.create(pipeline.executePipeline())
 
 497                 .expectSubscription()
 
 498                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 501         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 505     void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
 
 507         String pnfName1 = "olt1";
 
 508         String pnfName2 = "olt2";
 
 509         String attachmentPoint1 = "olt1-1-1";
 
 510         String attachmentPoint2 = "olt2-2-2";
 
 511         String remoteId1 = "newRemoteId1";
 
 512         String remoteId2 = "newRemoteId2";
 
 513         String cvlan1 = "1005";
 
 514         String cvlan2 = "1006";
 
 515         String svlan = "100";
 
 516         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 518         // Prepare stubbed replies
 
 519         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 520                 .correlationId(pnfName1)
 
 521                 .attachmentPoint(attachmentPoint1)
 
 526         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 527                 .correlationId(pnfName2)
 
 528                 .attachmentPoint(attachmentPoint2)
 
 534         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, "old-attachment-point", hsiCfsServiceInstanceId);
 
 535         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 536                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, cvlan1);
 
 539         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 540                 hsiCfsServiceInstance.getServiceInstanceId());
 
 542         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 543         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 544                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 545         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 546                 .thenReturn(Mono.just(pnfAaiObject))
 
 547                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 549                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 550                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 552         when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
 
 553         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
 555         // Execute the pipeline
 
 556         StepVerifier.create(pipeline.executePipeline())
 
 557                 .expectSubscription()
 
 558                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 561         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
 
 562         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 563         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 567     void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
 
 569         String pnfName1 = "olt1";
 
 570         String pnfName2 = "olt2";
 
 571         String attachmentPoint1 = "olt1-1-1";
 
 572         String attachmentPoint2 = "olt2-2-2";
 
 573         String remoteId1 = "newRemoteId1";
 
 574         String remoteId2 = "newRemoteId2";
 
 575         String cvlan1 = "1005";
 
 576         String cvlan2 = "1006";
 
 577         String svlan = "100";
 
 578         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 580         // Prepare stubbed replies
 
 581         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 582                 .correlationId(pnfName1)
 
 583                 .attachmentPoint(attachmentPoint1)
 
 588         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 589                 .correlationId(pnfName2)
 
 590                 .attachmentPoint(attachmentPoint2)
 
 596         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, "old-attachment-point", hsiCfsServiceInstanceId);
 
 597         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 598                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, cvlan2);
 
 601         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 602                 hsiCfsServiceInstance.getServiceInstanceId());
 
 604         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 605         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 606                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 607         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 608                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
 
 609                 .thenReturn(Mono.just(pnfAaiObject));
 
 611                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 612                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 614         when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
 
 615         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
 617         // Execute the pipeline
 
 618         StepVerifier.create(pipeline.executePipeline())
 
 619                 .expectSubscription()
 
 620                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 623         verify(aaiClientTask, times(2))
 
 624                 .executePnfRetrieval(anyString(), anyString());
 
 625         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 626         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 629     private PnfAaiObject constructPnfObject(String pnfName, String attachmentPoint,
 
 630                                             String hsiCfsServiceInstanceId) {
 
 632         // Build Relationship Data
 
 633         RelationshipListAaiObject.RelationshipEntryAaiObject firstRelationshipEntry =
 
 634                 ImmutableRelationshipEntryAaiObject.builder()
 
 635                         .relatedTo("service-instance")
 
 636                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
 
 637                                 + "/service-subscription/BBS-CFS/service-instances"
 
 638                                 + "/service-instance/" + hsiCfsServiceInstanceId)
 
 639                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
 
 640                         .relationshipData(Arrays.asList(
 
 641                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 642                                         .relationshipKey("customer.global-customer-id")
 
 643                                         .relationshipValue("Demonstration").build(),
 
 644                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 645                                         .relationshipKey("service-subscription.service-type")
 
 646                                         .relationshipValue("BBS-CFS").build(),
 
 647                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 648                                         .relationshipKey("service-instance.service-instance-id")
 
 649                                         .relationshipValue(hsiCfsServiceInstanceId).build())
 
 651                         .relatedToProperties(Collections.singletonList(
 
 652                                 ImmutablePropertyAaiObject.builder()
 
 653                                         .propertyKey("service-instance.service-instance-name")
 
 654                                         .propertyValue("bbs-instance").build())
 
 658         RelationshipListAaiObject.RelationshipEntryAaiObject secondRelationshipEntry =
 
 659                 ImmutableRelationshipEntryAaiObject.builder()
 
 660                         .relatedTo("logical-link")
 
 661                         .relatedLink("/network/logical-links/logical-link/" + attachmentPoint)
 
 662                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
 
 663                                 .relationshipKey("logical-link.link-name")
 
 664                                 .relationshipValue(attachmentPoint).build()))
 
 667         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 668                 .relationshipEntries(Arrays.asList(firstRelationshipEntry, secondRelationshipEntry))
 
 671         // Finally construct PNF object data
 
 672         return ImmutablePnfAaiObject.builder()
 
 674                 .isInMaintenance(true)
 
 675                 .relationshipListAaiObject(relationshipListAaiObject)
 
 679     private PnfAaiObject constructPnfObjectWithoutLogicalLink(String pnfName, String hsiCfsServiceInstanceId) {
 
 681         // Build Relationship Data
 
 682         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 683                 ImmutableRelationshipEntryAaiObject.builder()
 
 684                         .relatedTo("service-instance")
 
 685                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
 
 686                                 + "/service-subscription/BBS-CFS/service-instances"
 
 687                                 + "/service-instance/" + hsiCfsServiceInstanceId)
 
 688                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
 
 689                         .relationshipData(Arrays.asList(
 
 690                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 691                                         .relationshipKey("customer.global-customer-id")
 
 692                                         .relationshipValue("Demonstration").build(),
 
 693                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 694                                         .relationshipKey("service-subscription.service-type")
 
 695                                         .relationshipValue("BBS-CFS").build(),
 
 696                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 697                                         .relationshipKey("service-instance.service-instance-id")
 
 698                                         .relationshipValue(hsiCfsServiceInstanceId).build())
 
 700                         .relatedToProperties(Collections.singletonList(
 
 701                                 ImmutablePropertyAaiObject.builder()
 
 702                                         .propertyKey("service-instance.service-instance-name")
 
 703                                         .propertyValue("bbs-instance").build())
 
 707         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 708                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 711         // Finally construct PNF object data
 
 712         return ImmutablePnfAaiObject.builder()
 
 714                 .isInMaintenance(true)
 
 715                 .relationshipListAaiObject(relationshipListAaiObject)
 
 719     private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
 
 722         String orchestrationStatus = "active";
 
 724         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 725                 ImmutableRelationshipEntryAaiObject.builder()
 
 727                         .relatedLink("/pnfs/pnf/" + pnfName)
 
 728                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
 
 729                                 .relationshipKey("pnf.pnf-name")
 
 730                                 .relationshipValue(pnfName).build()))
 
 733         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 734                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 737         MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
 
 738                 ImmutableMetadataEntryAaiObject.builder()
 
 743         MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
 
 744                 .metadataEntries(Collections.singletonList(metadataEntry))
 
 747         // Finally construct Service Instance object data
 
 748         return ImmutableServiceInstanceAaiObject.builder()
 
 749                 .serviceInstanceId(hsiCfsServiceInstanceId)
 
 750                 .orchestrationStatus(orchestrationStatus)
 
 751                 .relationshipListAaiObject(relationshipListAaiObject)
 
 752                 .metadataListAaiObject(metadataListAaiObject)