2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.pipelines;
 
  23 import static org.junit.jupiter.api.Assertions.assertEquals;
 
  24 import static org.mockito.ArgumentMatchers.any;
 
  25 import static org.mockito.ArgumentMatchers.anyString;
 
  26 import static org.mockito.Mockito.times;
 
  27 import static org.mockito.Mockito.verify;
 
  28 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
  29 import static org.mockito.Mockito.verifyZeroInteractions;
 
  30 import static org.mockito.Mockito.when;
 
  31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_REREGISTRATION_TASK_NAME;
 
  32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
 
  33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
 
  35 import java.util.Arrays;
 
  36 import java.util.Collections;
 
  37 import java.util.HashMap;
 
  38 import java.util.UUID;
 
  40 import javax.net.ssl.SSLException;
 
  42 import org.junit.jupiter.api.BeforeEach;
 
  43 import org.junit.jupiter.api.DisplayName;
 
  44 import org.junit.jupiter.api.Test;
 
  45 import org.mockito.Mockito;
 
  46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
 
  47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
 
  48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
 
  49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
 
  50 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
 
  51 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
 
  52 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
 
  53 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
 
  54 import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel;
 
  55 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
 
  56 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
 
  57 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
 
  58 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
 
  59 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
 
  60 import org.onap.bbs.event.processor.model.PnfAaiObject;
 
  61 import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel;
 
  62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
 
  63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
 
  64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
 
  65 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
 
  66 import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
 
  67 import org.springframework.http.HttpStatus;
 
  68 import org.springframework.http.ResponseEntity;
 
  70 import reactor.core.publisher.Flux;
 
  71 import reactor.core.publisher.Mono;
 
  72 import reactor.test.StepVerifier;
 
  74 // We can safely suppress unchecked assignment warnings for the ResponseEntity mock
 
  75 @SuppressWarnings("unchecked")
 
  76 @DisplayName("PNF Re-registration Pipeline Unit-Tests")
 
  77 class ReRegistrationPipelineTest {
 
  79     private ReRegistrationPipeline pipeline;
 
  80     private ApplicationConfiguration configuration;
 
  81     private DmaapReRegistrationConsumerTask consumerTask;
 
  82     private DmaapPublisherTask publisherTask;
 
  83     private AaiClientTask aaiClientTask;
 
  85     private ResponseEntity<String> responseEntity;
 
  90         responseEntity = Mockito.mock(ResponseEntity.class);
 
  92         configuration = Mockito.mock(ApplicationConfiguration.class);
 
  93         consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class);
 
  94         publisherTask = Mockito.mock(DmaapPublisherTask.class);
 
  95         aaiClientTask = Mockito.mock(AaiClientTask.class);
 
  97         when(configuration.getReRegistrationCloseLoopControlName())
 
  98                 .thenReturn("controlName");
 
  99         when(configuration.getReRegistrationCloseLoopPolicyScope())
 
 100                 .thenReturn("policyScope");
 
 102         pipeline = new ReRegistrationPipeline(configuration, consumerTask,
 
 103                 publisherTask, aaiClientTask, new HashMap<>());
 
 107     void handleEmptyResponseFromDmaap() throws SSLException {
 
 109         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 110         when(consumerTask.execute(anyString()))
 
 111                 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
 
 113         StepVerifier.create(pipeline.executePipeline())
 
 114                 .expectSubscription()
 
 117         verifyZeroInteractions(aaiClientTask);
 
 118         verifyZeroInteractions(publisherTask);
 
 122     void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
 
 125         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 126         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 127                 .thenReturn(Flux.never());
 
 130         StepVerifier.create(pipeline.executePipeline())
 
 131                 .expectSubscription()
 
 134         verifyZeroInteractions(aaiClientTask);
 
 135         verifyZeroInteractions(publisherTask);
 
 139     void noResponseFromAai_PipelineTimesOut() throws SSLException {
 
 141         String pnfName = "olt1";
 
 142         String attachmentPoint = "olt2-2-2";
 
 143         String remoteId = "newRemoteId";
 
 144         String cvlan = "1005";
 
 145         String svlan = "100";
 
 147         // Prepare stubbed replies
 
 148         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 149                 .correlationId(pnfName)
 
 150                 .attachmentPoint(attachmentPoint)
 
 157         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 158         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 159         when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
 
 162         StepVerifier.create(pipeline.executePipeline())
 
 163                 .expectSubscription()
 
 166         verifyZeroInteractions(publisherTask);
 
 170     void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
 
 172         String pnfName = "olt1";
 
 173         String attachmentPoint = "olt2-2-2";
 
 174         String remoteId = "newRemoteId";
 
 175         String cvlan = "1005";
 
 176         String svlan = "100";
 
 177         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 179         // Prepare stubbed replies
 
 180         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 181                 .correlationId(pnfName)
 
 182                 .attachmentPoint(attachmentPoint)
 
 188         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "olt1-1-1", hsiCfsServiceInstanceId);
 
 189         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 190                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
 
 193         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 194                 hsiCfsServiceInstance.getServiceInstanceId());
 
 196         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 197         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 199         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 200                 .thenReturn(Mono.just(pnfAaiObject));
 
 203                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 204                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 206         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
 
 208         // Execute the pipeline
 
 209         StepVerifier.create(pipeline.executePipeline())
 
 210                 .expectSubscription()
 
 213         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 217     void singleCorrectEvent_PnfHavingNoLogicalLink_handleGracefully() throws SSLException {
 
 219         String pnfName = "olt1";
 
 220         String attachmentPoint = "olt2-2-2";
 
 221         String remoteId = "newRemoteId";
 
 222         String cvlan = "1005";
 
 223         String svlan = "100";
 
 224         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 226         // Prepare stubbed replies
 
 227         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 228                 .correlationId(pnfName)
 
 229                 .attachmentPoint(attachmentPoint)
 
 235         PnfAaiObject pnfAaiObject = constructPnfObjectWithoutLogicalLink(pnfName, hsiCfsServiceInstanceId);
 
 236         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 237                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
 
 240         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 241                 hsiCfsServiceInstance.getServiceInstanceId());
 
 243         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 244         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 246         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 247                 .thenReturn(Mono.just(pnfAaiObject));
 
 250                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 251                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 253         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 254         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 256         // Execute the pipeline
 
 257         StepVerifier.create(pipeline.executePipeline())
 
 258                 .expectSubscription()
 
 261         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
 
 262         verifyNoMoreInteractions(aaiClientTask);
 
 263         verifyZeroInteractions(publisherTask);
 
 267     void singleCorrectEvent_handleSuccessfully() throws SSLException {
 
 269         String pnfName = "olt1";
 
 270         String attachmentPoint = "olt2-2-2";
 
 271         String remoteId = "newRemoteId";
 
 272         String cvlan = "1005";
 
 273         String svlan = "100";
 
 274         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 276         // Prepare stubbed replies
 
 277         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 278                 .correlationId(pnfName)
 
 279                 .attachmentPoint(attachmentPoint)
 
 285         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, "old-attachment-point", hsiCfsServiceInstanceId);
 
 286         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 287                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, cvlan);
 
 290         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 291                 hsiCfsServiceInstance.getServiceInstanceId());
 
 293         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 294         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 296         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 297                 .thenReturn(Mono.just(pnfAaiObject));
 
 300                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 301                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 303         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 304         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 306         // Execute the pipeline
 
 307         StepVerifier.create(pipeline.executePipeline())
 
 308                 .expectSubscription()
 
 309                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 312         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 316     void twoCorrectEvents_handleSuccessfully() throws SSLException {
 
 318         String pnfName1 = "olt1";
 
 319         String pnfName2 = "olt2";
 
 320         String attachmentPoint1 = "olt1-1-1";
 
 321         String attachmentPoint2 = "olt2-2-2";
 
 322         String remoteId1 = "newRemoteId1";
 
 323         String remoteId2 = "newRemoteId2";
 
 324         String cvlan1 = "1005";
 
 325         String cvlan2 = "1006";
 
 326         String svlan = "100";
 
 327         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 328         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 330         // Prepare stubbed replies
 
 331         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 332                 .correlationId(pnfName1)
 
 333                 .attachmentPoint(attachmentPoint1)
 
 338         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 339                 .correlationId(pnfName2)
 
 340                 .attachmentPoint(attachmentPoint2)
 
 346         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
 
 347         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, "olt2-2-0", hsiCfsServiceInstanceId2);
 
 348         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 349                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
 
 350         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 351                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
 
 354         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 355         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 356         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 357                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 358         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 359                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 361         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 362         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 363                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 365         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 366         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 369                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 370                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 372                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 373                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 375         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 376         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 378         // Execute the pipeline
 
 379         StepVerifier.create(pipeline.executePipeline())
 
 380                 .expectSubscription()
 
 381                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 382                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 385         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
 
 389     void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
 
 391         String pnfName = "olt1";
 
 392         String attachmentPoint = "olt2-2-2";
 
 393         String remoteId = "newRemoteId";
 
 394         String cvlan = "1005";
 
 395         String svlan = "100";
 
 397         // Prepare stubbed replies
 
 398         ReRegistrationConsumerDmaapModel event = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 399                 .correlationId(pnfName)
 
 400                 .attachmentPoint(attachmentPoint)
 
 407         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 408         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 409         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 410                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 412         // Execute the pipeline
 
 413         StepVerifier.create(pipeline.executePipeline())
 
 414                 .expectSubscription()
 
 417         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
 
 418         verifyNoMoreInteractions(aaiClientTask);
 
 419         verifyZeroInteractions(publisherTask);
 
 423     void twoEvents_FirstOk_SecondNotRelocation_handleCorrectOnly() throws SSLException {
 
 425         String pnfName1 = "olt1";
 
 426         String pnfName2 = "olt2";
 
 427         String attachmentPoint1 = "olt1-1-1";
 
 428         String attachmentPoint2 = "olt2-2-2";
 
 429         String remoteId1 = "newRemoteId1";
 
 430         String remoteId2 = "newRemoteId2";
 
 431         String cvlan1 = "1005";
 
 432         String cvlan2 = "1006";
 
 433         String svlan = "100";
 
 434         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 435         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 437         // Prepare stubbed replies
 
 438         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 439                 .correlationId(pnfName1)
 
 440                 .attachmentPoint(attachmentPoint1)
 
 445         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 446                 .correlationId(pnfName2)
 
 447                 .attachmentPoint(attachmentPoint2)
 
 453         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, "olt1-1-0", hsiCfsServiceInstanceId1);
 
 454         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, attachmentPoint2, hsiCfsServiceInstanceId2);
 
 455         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 456                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, cvlan1);
 
 457         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 458                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, cvlan2);
 
 461         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 462         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 463         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 464                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 465         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 466                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 468         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 469         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 470                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 472         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 473         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 476                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 477                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 479                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 480                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 482         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 483         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 485         // Execute the pipeline
 
 486         StepVerifier.create(pipeline.executePipeline())
 
 487                 .expectSubscription()
 
 488                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 491         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 495     void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
 
 497         String pnfName1 = "olt1";
 
 498         String pnfName2 = "olt2";
 
 499         String attachmentPoint1 = "olt1-1-1";
 
 500         String attachmentPoint2 = "olt2-2-2";
 
 501         String remoteId1 = "newRemoteId1";
 
 502         String remoteId2 = "newRemoteId2";
 
 503         String cvlan1 = "1005";
 
 504         String cvlan2 = "1006";
 
 505         String svlan = "100";
 
 506         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 508         // Prepare stubbed replies
 
 509         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 510                 .correlationId(pnfName1)
 
 511                 .attachmentPoint(attachmentPoint1)
 
 516         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 517                 .correlationId(pnfName2)
 
 518                 .attachmentPoint(attachmentPoint2)
 
 524         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, "old-attachment-point", hsiCfsServiceInstanceId);
 
 525         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 526                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, cvlan1);
 
 529         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 530                 hsiCfsServiceInstance.getServiceInstanceId());
 
 532         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 533         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 534                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 535         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 536                 .thenReturn(Mono.just(pnfAaiObject))
 
 537                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 539                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 540                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 542         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 543         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 545         // Execute the pipeline
 
 546         StepVerifier.create(pipeline.executePipeline())
 
 547                 .expectSubscription()
 
 548                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 551         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
 
 552         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 553         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 557     void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
 
 559         String pnfName1 = "olt1";
 
 560         String pnfName2 = "olt2";
 
 561         String attachmentPoint1 = "olt1-1-1";
 
 562         String attachmentPoint2 = "olt2-2-2";
 
 563         String remoteId1 = "newRemoteId1";
 
 564         String remoteId2 = "newRemoteId2";
 
 565         String cvlan1 = "1005";
 
 566         String cvlan2 = "1006";
 
 567         String svlan = "100";
 
 568         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 570         // Prepare stubbed replies
 
 571         ReRegistrationConsumerDmaapModel firstEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 572                 .correlationId(pnfName1)
 
 573                 .attachmentPoint(attachmentPoint1)
 
 578         ReRegistrationConsumerDmaapModel secondEvent = ImmutableReRegistrationConsumerDmaapModel.builder()
 
 579                 .correlationId(pnfName2)
 
 580                 .attachmentPoint(attachmentPoint2)
 
 586         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, "old-attachment-point", hsiCfsServiceInstanceId);
 
 587         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 588                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, cvlan2);
 
 591         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 592                 hsiCfsServiceInstance.getServiceInstanceId());
 
 594         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 595         when(consumerTask.execute(CONSUME_REREGISTRATION_TASK_NAME))
 
 596                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 597         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 598                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
 
 599                 .thenReturn(Mono.just(pnfAaiObject));
 
 601                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 602                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 604         when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
 
 605         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
 
 607         // Execute the pipeline
 
 608         StepVerifier.create(pipeline.executePipeline())
 
 609                 .expectSubscription()
 
 610                 .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
 
 613         verify(aaiClientTask, times(2))
 
 614                 .executePnfRetrieval(anyString(), anyString());
 
 615         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 616         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 619     private PnfAaiObject constructPnfObject(String pnfName, String attachmentPoint,
 
 620                                             String hsiCfsServiceInstanceId) {
 
 622         // Build Relationship Data
 
 623         RelationshipListAaiObject.RelationshipEntryAaiObject firstRelationshipEntry =
 
 624                 ImmutableRelationshipEntryAaiObject.builder()
 
 625                         .relatedTo("service-instance")
 
 626                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
 
 627                                 + "/service-subscription/BBS-CFS/service-instances"
 
 628                                 + "/service-instance/" + hsiCfsServiceInstanceId)
 
 629                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
 
 630                         .relationshipData(Arrays.asList(
 
 631                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 632                                         .relationshipKey("customer.global-customer-id")
 
 633                                         .relationshipValue("Demonstration").build(),
 
 634                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 635                                         .relationshipKey("service-subscription.service-type")
 
 636                                         .relationshipValue("BBS-CFS").build(),
 
 637                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 638                                         .relationshipKey("service-instance.service-instance-id")
 
 639                                         .relationshipValue(hsiCfsServiceInstanceId).build())
 
 641                         .relatedToProperties(Collections.singletonList(
 
 642                                 ImmutablePropertyAaiObject.builder()
 
 643                                         .propertyKey("service-instance.service-instance-name")
 
 644                                         .propertyValue("bbs-instance").build())
 
 648         RelationshipListAaiObject.RelationshipEntryAaiObject secondRelationshipEntry =
 
 649                 ImmutableRelationshipEntryAaiObject.builder()
 
 650                         .relatedTo("logical-link")
 
 651                         .relatedLink("/network/logical-links/logical-link/" + attachmentPoint)
 
 652                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
 
 653                                 .relationshipKey("logical-link.link-name")
 
 654                                 .relationshipValue(attachmentPoint).build()))
 
 657         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 658                 .relationshipEntries(Arrays.asList(firstRelationshipEntry, secondRelationshipEntry))
 
 661         // Finally construct PNF object data
 
 662         return ImmutablePnfAaiObject.builder()
 
 664                 .isInMaintenance(true)
 
 665                 .relationshipListAaiObject(relationshipListAaiObject)
 
 669     private PnfAaiObject constructPnfObjectWithoutLogicalLink(String pnfName, String hsiCfsServiceInstanceId) {
 
 671         // Build Relationship Data
 
 672         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 673                 ImmutableRelationshipEntryAaiObject.builder()
 
 674                         .relatedTo("service-instance")
 
 675                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
 
 676                                 + "/service-subscription/BBS-CFS/service-instances"
 
 677                                 + "/service-instance/" + hsiCfsServiceInstanceId)
 
 678                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
 
 679                         .relationshipData(Arrays.asList(
 
 680                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 681                                         .relationshipKey("customer.global-customer-id")
 
 682                                         .relationshipValue("Demonstration").build(),
 
 683                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 684                                         .relationshipKey("service-subscription.service-type")
 
 685                                         .relationshipValue("BBS-CFS").build(),
 
 686                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 687                                         .relationshipKey("service-instance.service-instance-id")
 
 688                                         .relationshipValue(hsiCfsServiceInstanceId).build())
 
 690                         .relatedToProperties(Collections.singletonList(
 
 691                                 ImmutablePropertyAaiObject.builder()
 
 692                                         .propertyKey("service-instance.service-instance-name")
 
 693                                         .propertyValue("bbs-instance").build())
 
 697         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 698                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 701         // Finally construct PNF object data
 
 702         return ImmutablePnfAaiObject.builder()
 
 704                 .isInMaintenance(true)
 
 705                 .relationshipListAaiObject(relationshipListAaiObject)
 
 709     private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
 
 712         String orchestrationStatus = "active";
 
 714         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 715                 ImmutableRelationshipEntryAaiObject.builder()
 
 717                         .relatedLink("/pnfs/pnf/" + pnfName)
 
 718                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
 
 719                                 .relationshipKey("pnf.pnf-name")
 
 720                                 .relationshipValue(pnfName).build()))
 
 723         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 724                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 727         MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
 
 728                 ImmutableMetadataEntryAaiObject.builder()
 
 733         MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
 
 734                 .metadataEntries(Collections.singletonList(metadataEntry))
 
 737         // Finally construct Service Instance object data
 
 738         return ImmutableServiceInstanceAaiObject.builder()
 
 739                 .serviceInstanceId(hsiCfsServiceInstanceId)
 
 740                 .orchestrationStatus(orchestrationStatus)
 
 741                 .relationshipListAaiObject(relationshipListAaiObject)
 
 742                 .metadataListAaiObject(metadataListAaiObject)