2  * ============LICENSE_START=======================================================
 
   3  * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.bbs.event.processor.pipelines;
 
  23 import static org.junit.jupiter.api.Assertions.assertEquals;
 
  24 import static org.mockito.ArgumentMatchers.any;
 
  25 import static org.mockito.ArgumentMatchers.anyString;
 
  26 import static org.mockito.Mockito.times;
 
  27 import static org.mockito.Mockito.verify;
 
  28 import static org.mockito.Mockito.verifyNoMoreInteractions;
 
  29 import static org.mockito.Mockito.verifyZeroInteractions;
 
  30 import static org.mockito.Mockito.when;
 
  31 import static org.onap.bbs.event.processor.config.ApplicationConstants.CONSUME_CPE_AUTHENTICATION_TASK_NAME;
 
  32 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME;
 
  33 import static org.onap.bbs.event.processor.config.ApplicationConstants.RETRIEVE_PNF_TASK_NAME;
 
  35 import java.util.Arrays;
 
  36 import java.util.Collections;
 
  37 import java.util.HashMap;
 
  38 import java.util.UUID;
 
  40 import javax.net.ssl.SSLException;
 
  42 import org.junit.jupiter.api.BeforeEach;
 
  43 import org.junit.jupiter.api.DisplayName;
 
  44 import org.junit.jupiter.api.Test;
 
  45 import org.mockito.Mockito;
 
  46 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
 
  47 import org.onap.bbs.event.processor.exceptions.AaiTaskException;
 
  48 import org.onap.bbs.event.processor.exceptions.EmptyDmaapResponseException;
 
  49 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
 
  50 import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel;
 
  51 import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel;
 
  52 import org.onap.bbs.event.processor.model.ImmutableMetadataEntryAaiObject;
 
  53 import org.onap.bbs.event.processor.model.ImmutableMetadataListAaiObject;
 
  54 import org.onap.bbs.event.processor.model.ImmutablePnfAaiObject;
 
  55 import org.onap.bbs.event.processor.model.ImmutablePropertyAaiObject;
 
  56 import org.onap.bbs.event.processor.model.ImmutableRelationshipDataEntryAaiObject;
 
  57 import org.onap.bbs.event.processor.model.ImmutableRelationshipEntryAaiObject;
 
  58 import org.onap.bbs.event.processor.model.ImmutableRelationshipListAaiObject;
 
  59 import org.onap.bbs.event.processor.model.ImmutableServiceInstanceAaiObject;
 
  60 import org.onap.bbs.event.processor.model.MetadataListAaiObject;
 
  61 import org.onap.bbs.event.processor.model.PnfAaiObject;
 
  62 import org.onap.bbs.event.processor.model.RelationshipListAaiObject;
 
  63 import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
 
  64 import org.onap.bbs.event.processor.tasks.AaiClientTask;
 
  65 import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
 
  66 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
 
  67 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 
  68 import org.springframework.http.HttpStatus;
 
  70 import reactor.core.publisher.Flux;
 
  71 import reactor.core.publisher.Mono;
 
  72 import reactor.test.StepVerifier;
 
  74 @DisplayName("CPE Authentication Pipeline Unit-Tests")
 
  75 class CpeAuthenticationPipelineTest {
 
  77     private CpeAuthenticationPipeline pipeline;
 
  78     private ApplicationConfiguration configuration;
 
  79     private DmaapCpeAuthenticationConsumerTask consumerTask;
 
  80     private DmaapPublisherTask publisherTask;
 
  81     private AaiClientTask aaiClientTask;
 
  83     private HttpResponse httpResponse;
 
  88         httpResponse = Mockito.mock(HttpResponse.class);
 
  90         configuration = Mockito.mock(ApplicationConfiguration.class);
 
  91         consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
 
  92         publisherTask = Mockito.mock(DmaapPublisherTask.class);
 
  93         aaiClientTask = Mockito.mock(AaiClientTask.class);
 
  95         when(configuration.getCpeAuthenticationCloseLoopControlName())
 
  96                 .thenReturn("controlName");
 
  97         when(configuration.getCpeAuthenticationCloseLoopPolicyScope())
 
  98                 .thenReturn("policyScope");
 
  99         when(configuration.getPolicyVersion())
 
 100                 .thenReturn("1.0.0");
 
 101         when(configuration.getCloseLoopTargetType())
 
 103         when(configuration.getCloseLoopEventStatus())
 
 104                 .thenReturn("ONSET");
 
 105         when(configuration.getCloseLoopVersion())
 
 106                 .thenReturn("1.0.2");
 
 107         when(configuration.getCloseLoopTarget())
 
 108                 .thenReturn("CL-Target");
 
 109         when(configuration.getCloseLoopOriginator())
 
 110                 .thenReturn("DCAE-BBS-ep");
 
 112         pipeline = new CpeAuthenticationPipeline(configuration, consumerTask,
 
 113                 publisherTask, aaiClientTask, new HashMap<>());
 
 117     void handleEmptyResponseFromDmaap() throws SSLException {
 
 119         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 120         when(consumerTask.execute(anyString()))
 
 121                 .thenReturn(Flux.error(new EmptyDmaapResponseException("Mock empty")));
 
 123         StepVerifier.create(pipeline.executePipeline())
 
 124                 .expectSubscription()
 
 127         verifyZeroInteractions(aaiClientTask);
 
 128         verifyZeroInteractions(publisherTask);
 
 132     void noResponseFromDmaap_PipelineTimesOut() throws SSLException {
 
 135         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 136         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 137                 .thenReturn(Flux.never());
 
 140         StepVerifier.create(pipeline.executePipeline())
 
 141                 .expectSubscription()
 
 144         verifyZeroInteractions(aaiClientTask);
 
 145         verifyZeroInteractions(publisherTask);
 
 149     void noResponseFromAai_PipelineTimesOut() throws SSLException {
 
 151         String pnfName = "olt1";
 
 152         final String oldAuthenticationState = "outOfService";
 
 153         final String newAuthenticationState = "inService";
 
 154         final String stateInterface = "stateInterface";
 
 155         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 156         final String swVersion = "1.2";
 
 158         // Prepare stubbed replies
 
 159         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 160                 .correlationId(pnfName)
 
 161                 .oldAuthenticationState(oldAuthenticationState)
 
 162                 .newAuthenticationState(newAuthenticationState)
 
 163                 .stateInterface(stateInterface)
 
 164                 .rgwMacAddress(rgwMacAddress)
 
 165                 .swVersion(swVersion)
 
 169         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 170         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 171         when(aaiClientTask.executePnfRetrieval(anyString(), anyString())).thenReturn(Mono.never());
 
 174         StepVerifier.create(pipeline.executePipeline())
 
 175                 .expectSubscription()
 
 178         verifyZeroInteractions(publisherTask);
 
 182     void noResponseWhilePublishing_PipelineTimesOut() throws SSLException {
 
 184         String pnfName = "olt1";
 
 185         final String oldAuthenticationState = "outOfService";
 
 186         final String newAuthenticationState = "inService";
 
 187         final String stateInterface = "stateInterface";
 
 188         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 189         final String swVersion = "1.2";
 
 190         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 192         // Prepare stubbed replies
 
 193         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 194                 .correlationId(pnfName)
 
 195                 .oldAuthenticationState(oldAuthenticationState)
 
 196                 .newAuthenticationState(newAuthenticationState)
 
 197                 .stateInterface(stateInterface)
 
 198                 .rgwMacAddress(rgwMacAddress)
 
 199                 .swVersion(swVersion)
 
 202         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
 
 203         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 204                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
 
 207         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 208                 hsiCfsServiceInstance.getServiceInstanceId());
 
 210         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(1);
 
 211         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 213         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 214                 .thenReturn(Mono.just(pnfAaiObject));
 
 217                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 218                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 220         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.never());
 
 222         // Execute the pipeline
 
 223         StepVerifier.create(pipeline.executePipeline())
 
 224                 .expectSubscription()
 
 227         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 231     void singleCorrectEvent_handleSuccessfully() throws SSLException {
 
 233         String pnfName = "olt1";
 
 234         final String oldAuthenticationState = "outOfService";
 
 235         final String newAuthenticationState = "inService";
 
 236         final String stateInterface = "stateInterface";
 
 237         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 238         final String swVersion = "1.2";
 
 239         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 241         // Prepare stubbed replies
 
 242         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 243                 .correlationId(pnfName)
 
 244                 .oldAuthenticationState(oldAuthenticationState)
 
 245                 .newAuthenticationState(newAuthenticationState)
 
 246                 .stateInterface(stateInterface)
 
 247                 .rgwMacAddress(rgwMacAddress)
 
 248                 .swVersion(swVersion)
 
 251         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName, hsiCfsServiceInstanceId);
 
 252         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 253                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName, rgwMacAddress);
 
 256         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 257                 hsiCfsServiceInstance.getServiceInstanceId());
 
 259         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 260         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 262         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 263                 .thenReturn(Mono.just(pnfAaiObject));
 
 266                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 267                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 269         when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
 
 270         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
 272         // Execute the pipeline
 
 273         StepVerifier.create(pipeline.executePipeline())
 
 274                 .expectSubscription()
 
 275                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 278         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 282     void twoCorrectEvents_handleSuccessfully() throws SSLException {
 
 284         String pnfName1 = "olt1";
 
 285         String pnfName2 = "olt2";
 
 286         final String oldAuthenticationState = "outOfService";
 
 287         final String newAuthenticationState = "inService";
 
 288         final String stateInterface = "stateInterface";
 
 289         final String rgwMacAddress1 = "00:0a:95:8d:78:16";
 
 290         final String rgwMacAddress2 = "00:0a:95:8d:78:17";
 
 291         final String swVersion = "1.2";
 
 292         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 293         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 295         // Prepare stubbed replies
 
 296         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 297                 .correlationId(pnfName1)
 
 298                 .oldAuthenticationState(oldAuthenticationState)
 
 299                 .newAuthenticationState(newAuthenticationState)
 
 300                 .stateInterface(stateInterface)
 
 301                 .rgwMacAddress(rgwMacAddress1)
 
 302                 .swVersion(swVersion)
 
 304         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 305                 .correlationId(pnfName2)
 
 306                 .oldAuthenticationState(oldAuthenticationState)
 
 307                 .newAuthenticationState(newAuthenticationState)
 
 308                 .stateInterface(stateInterface)
 
 309                 .rgwMacAddress(rgwMacAddress2)
 
 310                 .swVersion(swVersion)
 
 313         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
 
 314         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
 
 315         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 316                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
 
 317         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 318                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2, rgwMacAddress2);
 
 321         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 322         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 323         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 324                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 325         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 326                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 328         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 329         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 330                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 332         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 333         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 336                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 337                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 339                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 340                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 342         when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
 
 343         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
 345         // Execute the pipeline
 
 346         StepVerifier.create(pipeline.executePipeline())
 
 347                 .expectSubscription()
 
 348                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 349                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 352         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
 
 356     void singleEvent_withPnfErrorReply_handleGracefully() throws SSLException {
 
 358         String pnfName = "olt1";
 
 359         final String oldAuthenticationState = "outOfService";
 
 360         final String newAuthenticationState = "inService";
 
 361         final String stateInterface = "stateInterface";
 
 362         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 363         final String swVersion = "1.2";
 
 365         // Prepare stubbed replies
 
 366         CpeAuthenticationConsumerDmaapModel event = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 367                 .correlationId(pnfName)
 
 368                 .oldAuthenticationState(oldAuthenticationState)
 
 369                 .newAuthenticationState(newAuthenticationState)
 
 370                 .stateInterface(stateInterface)
 
 371                 .rgwMacAddress(rgwMacAddress)
 
 372                 .swVersion(swVersion)
 
 376         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 377         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME)).thenReturn(Flux.just(event));
 
 378         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 379                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 381         // Execute the pipeline
 
 382         StepVerifier.create(pipeline.executePipeline())
 
 383                 .expectSubscription()
 
 386         verify(aaiClientTask).executePnfRetrieval(anyString(), anyString());
 
 387         verifyNoMoreInteractions(aaiClientTask);
 
 388         verifyZeroInteractions(publisherTask);
 
 392     void twoEvents_FirstOk_SecondUnmatchedMac_handleCorrectOnly() throws SSLException {
 
 394         String pnfName1 = "olt1";
 
 395         String pnfName2 = "olt2";
 
 396         final String oldAuthenticationState = "outOfService";
 
 397         final String newAuthenticationState = "inService";
 
 398         final String stateInterface = "stateInterface";
 
 399         final String rgwMacAddress1 = "00:0a:95:8d:78:16";
 
 400         final String rgwMacAddress2 = "00:0a:95:8d:78:17";
 
 401         final String swVersion = "1.2";
 
 402         String hsiCfsServiceInstanceId1 = UUID.randomUUID().toString();
 
 403         String hsiCfsServiceInstanceId2 = UUID.randomUUID().toString();
 
 405         // Prepare stubbed replies
 
 406         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 407                 .correlationId(pnfName1)
 
 408                 .oldAuthenticationState(oldAuthenticationState)
 
 409                 .newAuthenticationState(newAuthenticationState)
 
 410                 .stateInterface(stateInterface)
 
 411                 .rgwMacAddress(rgwMacAddress1)
 
 412                 .swVersion(swVersion)
 
 414         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 415                 .correlationId(pnfName2)
 
 416                 .oldAuthenticationState(oldAuthenticationState)
 
 417                 .newAuthenticationState(newAuthenticationState)
 
 418                 .stateInterface(stateInterface)
 
 419                 .rgwMacAddress(rgwMacAddress2)
 
 420                 .swVersion(swVersion)
 
 423         PnfAaiObject pnfAaiObject1 = constructPnfObject(pnfName1, hsiCfsServiceInstanceId1);
 
 424         PnfAaiObject pnfAaiObject2 = constructPnfObject(pnfName2, hsiCfsServiceInstanceId2);
 
 425         ServiceInstanceAaiObject hsiCfsServiceInstance1 =
 
 426                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId1, pnfName1, rgwMacAddress1);
 
 427         ServiceInstanceAaiObject hsiCfsServiceInstance2 =
 
 428                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId2, pnfName2,
 
 429                         "Having unmatched RGW MAC address");
 
 432         String pnfUrl1 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName1);
 
 433         String pnfUrl2 = String.format("/aai/v14/network/pnfs/pnf/%s?depth=all", pnfName2);
 
 434         String cfsUrl1 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 435                 hsiCfsServiceInstance1.getServiceInstanceId());
 
 436         String cfsUrl2 = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 437                 hsiCfsServiceInstance2.getServiceInstanceId());
 
 439         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 440         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 441                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 443         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl1)).thenReturn(Mono.just(pnfAaiObject1));
 
 444         when(aaiClientTask.executePnfRetrieval(RETRIEVE_PNF_TASK_NAME, pnfUrl2)).thenReturn(Mono.just(pnfAaiObject2));
 
 447                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl1))
 
 448                 .thenReturn(Mono.just(hsiCfsServiceInstance1));
 
 450                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
 
 451                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
 453         when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
 
 454         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
 456         // Execute the pipeline
 
 457         StepVerifier.create(pipeline.executePipeline())
 
 458                 .expectSubscription()
 
 459                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 462         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 466     void twoEvents_firstOk_secondWithPnfErrorReply_handleCorrectOnly() throws SSLException {
 
 468         String pnfName1 = "olt1";
 
 469         String pnfName2 = "olt2";
 
 470         final String oldAuthenticationState = "outOfService";
 
 471         final String newAuthenticationState = "inService";
 
 472         final String stateInterface = "stateInterface";
 
 473         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 474         final String swVersion = "1.2";
 
 475         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 477         // Prepare stubbed replies
 
 478         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 479                 .correlationId(pnfName1)
 
 480                 .oldAuthenticationState(oldAuthenticationState)
 
 481                 .newAuthenticationState(newAuthenticationState)
 
 482                 .stateInterface(stateInterface)
 
 483                 .rgwMacAddress(rgwMacAddress)
 
 484                 .swVersion(swVersion)
 
 486         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 487                 .correlationId(pnfName2)
 
 488                 .oldAuthenticationState(oldAuthenticationState)
 
 489                 .newAuthenticationState(newAuthenticationState)
 
 490                 .stateInterface(stateInterface)
 
 491                 .rgwMacAddress(rgwMacAddress)
 
 492                 .swVersion(swVersion)
 
 495         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName1, hsiCfsServiceInstanceId);
 
 496         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 497                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName1, rgwMacAddress);
 
 500         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 501                 hsiCfsServiceInstance.getServiceInstanceId());
 
 503         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 504         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 505                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 506         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 507                 .thenReturn(Mono.just(pnfAaiObject))
 
 508                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")));
 
 510                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 511                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 513         when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
 
 514         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
 516         // Execute the pipeline
 
 517         StepVerifier.create(pipeline.executePipeline())
 
 518                 .expectSubscription()
 
 519                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 522         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
 
 523         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 524         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 528     void twoEvents_firstWithPnfErrorReply_secondOk_handleCorrectOnly() throws SSLException {
 
 530         String pnfName1 = "olt1";
 
 531         String pnfName2 = "olt2";
 
 532         final String oldAuthenticationState = "outOfService";
 
 533         final String newAuthenticationState = "inService";
 
 534         final String stateInterface = "stateInterface";
 
 535         final String rgwMacAddress = "00:0a:95:8d:78:16";
 
 536         final String swVersion = "1.2";
 
 537         String hsiCfsServiceInstanceId = UUID.randomUUID().toString();
 
 539         // Prepare stubbed replies
 
 540         CpeAuthenticationConsumerDmaapModel firstEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 541                 .correlationId(pnfName1)
 
 542                 .oldAuthenticationState(oldAuthenticationState)
 
 543                 .newAuthenticationState(newAuthenticationState)
 
 544                 .stateInterface(stateInterface)
 
 545                 .rgwMacAddress(rgwMacAddress)
 
 546                 .swVersion(swVersion)
 
 548         CpeAuthenticationConsumerDmaapModel secondEvent = ImmutableCpeAuthenticationConsumerDmaapModel.builder()
 
 549                 .correlationId(pnfName2)
 
 550                 .oldAuthenticationState(oldAuthenticationState)
 
 551                 .newAuthenticationState(newAuthenticationState)
 
 552                 .stateInterface(stateInterface)
 
 553                 .rgwMacAddress(rgwMacAddress)
 
 554                 .swVersion(swVersion)
 
 557         PnfAaiObject pnfAaiObject = constructPnfObject(pnfName2, hsiCfsServiceInstanceId);
 
 558         ServiceInstanceAaiObject hsiCfsServiceInstance =
 
 559                 constructHsiCfsServiceInstanceObject(hsiCfsServiceInstanceId, pnfName2, rgwMacAddress);
 
 562         String cfsUrl = String.format("/aai/v14/nodes/service-instances/service-instance/%s?depth=all",
 
 563                 hsiCfsServiceInstance.getServiceInstanceId());
 
 565         when(configuration.getPipelinesTimeoutInSeconds()).thenReturn(10);
 
 566         when(consumerTask.execute(CONSUME_CPE_AUTHENTICATION_TASK_NAME))
 
 567                 .thenReturn(Flux.fromIterable(Arrays.asList(firstEvent, secondEvent)));
 
 568         when(aaiClientTask.executePnfRetrieval(anyString(), anyString()))
 
 569                 .thenReturn(Mono.error(new AaiTaskException("Mock A&AI exception")))
 
 570                 .thenReturn(Mono.just(pnfAaiObject));
 
 572                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
 
 573                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
 575         when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
 
 576         when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
 578         // Execute the pipeline
 
 579         StepVerifier.create(pipeline.executePipeline())
 
 580                 .expectSubscription()
 
 581                 .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
 
 584         verify(aaiClientTask, times(2))
 
 585                 .executePnfRetrieval(anyString(), anyString());
 
 586         verify(aaiClientTask).executeServiceInstanceRetrieval(anyString(), anyString());
 
 587         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
 
 590     private PnfAaiObject constructPnfObject(String pnfName, String hsiCfsServiceInstanceId) {
 
 592         // Build Relationship Data
 
 593         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 594                 ImmutableRelationshipEntryAaiObject.builder()
 
 595                         .relatedTo("service-instance")
 
 596                         .relatedLink("/aai/v14/business/customers/customer/Demonstration/service-subscriptions"
 
 597                                 + "/service-subscription/BBS-CFS/service-instances"
 
 598                                 + "/service-instance/" + hsiCfsServiceInstanceId)
 
 599                         .relationshipLabel("org.onap.relationships.inventory.ComposedOf")
 
 600                         .relationshipData(Arrays.asList(
 
 601                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 602                                         .relationshipKey("customer.global-customer-id")
 
 603                                         .relationshipValue("Demonstration").build(),
 
 604                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 605                                         .relationshipKey("service-subscription.service-type")
 
 606                                         .relationshipValue("BBS-CFS").build(),
 
 607                                 ImmutableRelationshipDataEntryAaiObject.builder()
 
 608                                         .relationshipKey("service-instance.service-instance-id")
 
 609                                         .relationshipValue(hsiCfsServiceInstanceId).build())
 
 611                         .relatedToProperties(Collections.singletonList(
 
 612                                 ImmutablePropertyAaiObject.builder()
 
 613                                         .propertyKey("service-instance.service-instance-name")
 
 614                                         .propertyValue("bbs-instance").build())
 
 618         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 619                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 622         // Finally construct PNF object data
 
 623         return ImmutablePnfAaiObject.builder()
 
 625                 .isInMaintenance(true)
 
 626                 .relationshipListAaiObject(relationshipListAaiObject)
 
 630     private ServiceInstanceAaiObject constructHsiCfsServiceInstanceObject(String hsiCfsServiceInstanceId,
 
 632                                                                           String rgwMacAddress) {
 
 633         String orchestrationStatus = "active";
 
 635         RelationshipListAaiObject.RelationshipEntryAaiObject relationshipEntry =
 
 636                 ImmutableRelationshipEntryAaiObject.builder()
 
 638                         .relatedLink("/pnfs/pnf/" + pnfName)
 
 639                         .relationshipData(Collections.singletonList(ImmutableRelationshipDataEntryAaiObject.builder()
 
 640                                 .relationshipKey("pnf.pnf-name")
 
 641                                 .relationshipValue(pnfName).build()))
 
 644         RelationshipListAaiObject relationshipListAaiObject = ImmutableRelationshipListAaiObject.builder()
 
 645                 .relationshipEntries(Collections.singletonList(relationshipEntry))
 
 648         MetadataListAaiObject.MetadataEntryAaiObject metadataEntry =
 
 649                 ImmutableMetadataEntryAaiObject.builder()
 
 650                         .metaname("rgw-mac-address")
 
 651                         .metavalue(rgwMacAddress)
 
 654         MetadataListAaiObject metadataListAaiObject = ImmutableMetadataListAaiObject.builder()
 
 655                 .metadataEntries(Collections.singletonList(metadataEntry))
 
 658         // Finally construct Service Instance object data
 
 659         return ImmutableServiceInstanceAaiObject.builder()
 
 660                 .serviceInstanceId(hsiCfsServiceInstanceId)
 
 661                 .orchestrationStatus(orchestrationStatus)
 
 662                 .relationshipListAaiObject(relationshipListAaiObject)
 
 663                 .metadataListAaiObject(metadataListAaiObject)